Get monitoring, observability, online education,
and expert support from Lightbend.
Learn More

Testing a Flink Streamlet

Integration with the Lyft Flink Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Flink integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Flink Native Kubernetes integration. The documentation that follows describes the deprecated feature. The FlinkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeFlinkPlugin, which is described in the Cloudflow contrib documentation for building Flink native streamlets.

A testkit is provided to make it easier to write unit tests for Flink streamlets. The unit tests are meant to facilitate local testing of streamlets. FlinkTestkit offers APIs to write unit tests for Flink streamlets in both Scala and Java.

Basic flow of testkit APIs in Scala

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Extend the test class with the FlinkTestkit abstract class. This abstract class provides the basic initialization and cleanups and the core APIs of the testkit

  2. Create a Flink streamlet

  3. Setup inlet taps that tap the inlet ports of the streamlet

  4. Setup outlet taps for outlet ports

  5. Push data into inlet ports

  6. Run the streamlet using the run method that the testkit offers

  7. Write assertions to ensure that the expected results match the actual ones

Details of the workflow in Scala

Let’s consider an example where we would like to write unit tests for testing a FlinkStreamlet that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.

Setting up a sample FlinkStreamlet

FlinkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. We will now write unit tests for the following FlinkStreamlet class.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/main/scala/com/example/FlinkProcessor.scala[]

The unit test

Here is a list of imports needed for writing the test suite.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/test/scala/com/example/FlinkProcessorSpec.scala[]

Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-scala/step3/src/test/scala/com/example/FlinkProcessorSpec.scala[]

Basic flow of testkit APIs in Java

Here’s the basic flow that you need to follow in Java when writing tests using the testkit:

  1. Extend the test class with the JUnitSuite trait from ScalaTest.

  2. Instantiate the testkit class

  3. Create the Flink streamlet that needs to be tested

  4. Prepare data to be pushed into inlet ports

  5. Setup inlet taps that tap the inlet ports of the streamlet

  6. Setup outlet taps for outlet ports

  7. Run the streamlet using the run method that the testkit offers

  8. Write assertions to ensure that the expected results match the actual ones

Details of the workflow in Java

Let’s consider a FlinkStreamlet class that reads data from an inlet, process that data, and writes it to an outlet. To write unit tests for this class, we will follow the steps that we outlined in the last section, using ScalaTest as the testing library.

We will discuss the steps for implementation in both Scala and Java.

Setting up a sample FlinkStreamlet

FlinkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. Here’s a sample FlinkStreamlet that we would like to write unit tests for.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/main/java/com/example/FlinkProcessor.java[]

The unit test

Here is a list of imports needed for writing the test suite.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/test/java/com/example/FlinkProcessorTest.java[]

Here’s how we would write a unit test using ScalaTest. The logical steps of the test are annotated with inline comments explaining their rationale.

Unresolved include directive in modules/develop/pages/test-flink-streamlet.adoc - include::2.3.0@docsnippets:ROOT:example$build-flink-streamlets-java/step3/src/test/java/com/example/FlinkProcessorTest.java[]

The FlinkTestkit class

  1. Provides core APIs like inletAsTap, outletAsTap, getInletAsTap (Java API), getOutletAsTap (Java API) and run (both Java and Scala APIs).

  2. Supports adding values for configuration parameters.