Testing a Flink Streamlet
Integration with the Lyft Flink Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Flink integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Flink Native Kubernetes integration. The documentation that follows describes the deprecated feature. The FlinkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeFlinkPlugin , which is described in the Cloudflow contrib documentation for building Flink native streamlets.
|
A testkit
is provided to make it easier to write unit tests for Flink streamlets. The unit tests are meant to facilitate local testing of streamlets. FlinkTestkit
offers APIs to write unit tests for Flink streamlets in both Scala and Java.
Basic flow of testkit
APIs in Scala
Here’s the basic flow that you need to follow when writing tests using the testkit
:
-
Extend the test class with the
FlinkTestkit
abstract class. This abstract class provides the basic initialization and cleanups and the core APIs of thetestkit
-
Create a Flink streamlet
-
Setup inlet taps that tap the inlet ports of the streamlet
-
Setup outlet taps for outlet ports
-
Push data into inlet ports
-
Run the streamlet using the
run
method that the testkit offers -
Write assertions to ensure that the expected results match the actual ones
Details of the workflow in Scala
Let’s consider an example where we would like to write unit tests for testing a FlinkStreamlet
that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.
Setting up a sample FlinkStreamlet
FlinkStreamlet
is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. We will now write unit tests for the following FlinkStreamlet
class.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/main/scala/com/example/FlinkProcessor.scala[]
The unit test
Here is a list of imports needed for writing the test suite.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/test/scala/com/example/FlinkProcessorSpec.scala[]
Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/test/scala/com/example/FlinkProcessorSpec.scala[]
Basic flow of testkit
APIs in Java
Here’s the basic flow that you need to follow in Java when writing tests using the testkit
:
-
Extend the test class with the
JUnitSuite
trait from ScalaTest. -
Instantiate the testkit class
-
Create the Flink streamlet that needs to be tested
-
Prepare data to be pushed into inlet ports
-
Setup inlet taps that tap the inlet ports of the streamlet
-
Setup outlet taps for outlet ports
-
Run the streamlet using the
run
method that the testkit offers -
Write assertions to ensure that the expected results match the actual ones
Details of the workflow in Java
Let’s consider a FlinkStreamlet
class that reads data from an inlet, process that data, and writes it to an outlet. To write unit tests for this class, we will follow the steps that we outlined in the last section, using ScalaTest as the testing library.
We will discuss the steps for implementation in both Scala and Java.
Setting up a sample FlinkStreamlet
FlinkStreamlet
is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. Here’s a sample FlinkStreamlet
that we would like to write unit tests for.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/main/java/com/example/FlinkProcessor.java[]
The unit test
Here is a list of imports needed for writing the test suite.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/test/java/com/example/FlinkProcessorTest.java[]
Here’s how we would write a unit test using ScalaTest. The logical steps of the test are annotated with inline comments explaining their rationale.
Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/test/java/com/example/FlinkProcessorTest.java[]