Testing a Flink Streamlet

A testkit is provided to make it easier to write unit tests for Flink streamlets. The unit tests are meant to facilitate local testing of streamlets. FlinkTestkit offers APIs to write unit tests for Flink streamlets in both Scala and Java.

Basic flow of testkit APIs in Scala

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Extend the test class with the FlinkTestkit abstract class. This abstract class provides the basic initialization and cleanups and the core APIs of the testkit

  2. Create a Flink streamlet

  3. Setup inlet taps that tap the inlet ports of the streamlet

  4. Setup outlet taps for outlet ports

  5. Push data into inlet ports

  6. Run the streamlet using the run method that the testkit offers

  7. Write assertions to ensure that the expected results match the actual ones

Details of the workflow in Scala

Let’s consider an example where we would like to write unit tests for testing a FlinkStreamlet that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.

Setting up a sample FlinkStreamlet

FlinkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. We will now write unit tests for the following FlinkStreamlet class.

import org.apache.flink.streaming.api.scala._
import cloudflow.streamlets.StreamletShape
import cloudflow.streamlets.avro._
import cloudflow.flink._

class FlinkProcessor extends FlinkStreamlet {

  // Step 1: Define inlets and outlets. Note for the outlet you need to specify
  //         the partitioner function explicitly
  val in  = AvroInlet[Data]("in")
  val out = AvroOutlet[Data]("out", _.id.toString)

  // Step 2: Define the shape of the streamlet. In this example the streamlet
  //         has 1 inlet and 1 outlet
  val shape = StreamletShape(in, out)

  // Step 3: Provide custom implementation of `FlinkStreamletLogic` that defines
  //         the behavior of the streamlet
  override def createLogic() = new FlinkStreamletLogic {
    override def buildExecutionGraph = {
      val ins: DataStream[Data]  = readStream(in)
      val outs: DataStream[Data] = ins.filter(_.id % 2 == 0)
      writeStream(out, outs)
    }
  }
}

The unit test

Here is a list of imports needed for writing the test suite.

import scala.collection.immutable.Seq

import org.apache.flink.streaming.api.scala._

import cloudflow.flink.testkit._
import org.scalatest._

Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

// 1. Extend from the abstract class FlinkTestkit
class FlinkProcessorSpec extends FlinkTestkit
  with WordSpecLike
  with Matchers
  with BeforeAndAfterAll {

  "FlinkProcessor" should {
    "process streaming data" in {
      @transient lazy val env = StreamExecutionEnvironment.getExecutionEnvironment

      // 2. Create the FlinkStreamlet to test
      val processor = new FlinkProcessor

      // 3. Prepare data to be pushed into inlet ports
      val data = (1 to 10).map(i ⇒ new Data(i, s"name$i"))

      // 4. Setup inlet taps that tap the inlet ports of the streamlet
      val in: FlinkInletTap[Data] = inletAsTap[Data](
        processor.in,
        env.addSource(FlinkSource.CollectionSourceFunction(data)))

      // 5. Setup outlet taps for outlet ports
      val out: FlinkOutletTap[Data] = outletAsTap[Data](processor.out)

      // 6. Run the streamlet using the `run` method that the testkit offers
      run(processor, Seq(in), Seq(out), env)

      // 7. Write assertions to ensure that the expected results match the actual ones
      TestFlinkStreamletContext.result should contain((Data(2, "name2")).toString())
      TestFlinkStreamletContext.result.size should equal(5)
    }
  }
}

Basic flow of testkit APIs in Java

Here’s the basic flow that you need to follow in Java when writing tests using the testkit:

  1. Extend the test class with the JUnitSuite trait from ScalaTest.

  2. Instantiate the testkit class

  3. Create the Flink streamlet that needs to be tested

  4. Prepare data to be pushed into inlet ports

  5. Setup inlet taps that tap the inlet ports of the streamlet

  6. Setup outlet taps for outlet ports

  7. Run the streamlet using the run method that the testkit offers

  8. Write assertions to ensure that the expected results match the actual ones

Details of the workflow in Java

Let’s consider a FlinkStreamlet class that reads data from an inlet, process that data, and writes it to an outlet. To write unit tests for this class, we will follow the steps that we outlined in the last section, using ScalaTest as the testing library.

We will discuss the steps for implementation in both Scala and Java.

Setting up a sample FlinkStreamlet

FlinkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. Here’s a sample FlinkStreamlet that we would like to write unit tests for.

import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.*;
import cloudflow.flink.*;

public class FlinkProcessor extends FlinkStreamlet {

  // Step 1: Define inlets and outlets. Note for the outlet you need to specify
  //         the partitioner function explicitly or else RoundRobinPartitioner will
  //         be used : using `name` as the partitioner here
  AvroInlet<Data> in = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<Data> out = AvroOutlet.<Data>create("out", (Data d) -> Integer.toString(d.getId()), Data.class);

  // Step 2: Define the shape of the streamlet. In this example the streamlet
  //         has 1 inlet and 1 outlet
  @Override public StreamletShape shape() {
    return StreamletShape.createWithInlets(in).withOutlets(out);
  }

  // Step 3: Provide custom implementation of `FlinkStreamletLogic` that defines
  //         the behavior of the streamlet
  @Override public FlinkStreamletLogic createLogic() {
    return new FlinkStreamletLogic(getContext()) {
      @Override public void buildExecutionGraph() {

        DataStream<Data> ins =
          this.<Data>readStream(in, Data.class)
            .map((Data d) -> d)
            .returns(new TypeHint<Data>(){}.getTypeInfo());

        DataStream<Data> simples = ins.filter((Data d) -> d.getId() % 2 == 0);
        DataStreamSink<Data> sink = writeStream(out, simples, Data.class);
      }
    };
  }
}

The unit test

Here is a list of imports needed for writing the test suite.

import org.junit.*;
import static org.junit.Assert.*;
import junit.framework.TestCase;

import org.scalatestplus.junit.JUnitSuite;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import cloudflow.flink.testkit.*;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import cloudflow.flink.*;

Here’s how we would write a unit test using ScalaTest. The logical steps of the test are annotated with inline comments explaining their rationale.

// 1. Extend from the abstract class JUnitSuite
public class FlinkProcessorTest extends JUnitSuite {

  @Test
  public void shouldProcessDataWhenItIsRun() {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. Instantiate the testkit class FlinkTestkit
    FlinkTestkit testkit = new FlinkTestkit() {};

    // 3. Create the FlinkStreamlet to test
    FlinkProcessor streamlet = new FlinkProcessor();

    // 4. Prepare data to be pushed into inlet ports
    List<Integer> range = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
    List<Data> data = range.stream().map((Integer i) -> new Data(i, "name" + i.toString())).collect(Collectors.toList());

    // 5. Setup inlet taps that tap the inlet ports of the streamlet
    FlinkInletTap<Data> in = testkit.<Data>getInletAsTap(streamlet.in,
      env.<Data>addSource(
        FlinkSource.<Data>collectionSourceFunction(data),
        TypeInformation.<Data>of(Data.class)
      ),
      Data.class
    );

    // 6. Setup outlet taps for outlet ports
    FlinkOutletTap<Data> out = testkit.getOutletAsTap(streamlet.out, Data.class);

    // 7. Run the streamlet using the `run` method that the testkit offers
    testkit.run(streamlet, Collections.singletonList(in), Collections.singletonList(out), env);

    // 8. Write assertions to ensure that the expected results match the actual ones
    assertTrue(TestFlinkStreamletContext.result().contains((new Data(2 ,"name2").toString())));
    assertEquals(TestFlinkStreamletContext.result().size(), 5);
  }
}

The FlinkTestkit class

  1. Provides core APIs like inletAsTap, outletAsTap, getInletAsTap (Java API), getOutletAsTap (Java API) and run (both Java and Scala APIs).

  2. Supports adding values for configuration parameters.