Testing a Flink Streamlet
A testkit
is provided to make it easier to write unit tests for Flink streamlets. The unit tests are meant to facilitate local testing of streamlets. FlinkTestkit
offers APIs to write unit tests for Flink streamlets in both Scala and Java.
Basic flow of testkit
APIs in Scala
Here’s the basic flow that you need to follow when writing tests using the testkit
:
-
Extend the test class with the
FlinkTestkit
abstract class. This abstract class provides the basic initialization and cleanups and the core APIs of thetestkit
-
Create a Flink streamlet
-
Setup inlet taps that tap the inlet ports of the streamlet
-
Setup outlet taps for outlet ports
-
Push data into inlet ports
-
Run the streamlet using the
run
method that the testkit offers -
Write assertions to ensure that the expected results match the actual ones
Details of the workflow in Scala
Let’s consider an example where we would like to write unit tests for testing a FlinkStreamlet
that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.
Setting up a sample FlinkStreamlet
FlinkStreamlet
is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. We will now write unit tests for the following FlinkStreamlet
class.
import org.apache.flink.streaming.api.scala._
import cloudflow.streamlets.StreamletShape
import cloudflow.streamlets.avro._
import cloudflow.flink._
class FlinkProcessor extends FlinkStreamlet {
// Step 1: Define inlets and outlets. Note for the outlet you need to specify
// the partitioner function explicitly
val in = AvroInlet[Data]("in")
val out = AvroOutlet[Data]("out", _.id.toString)
// Step 2: Define the shape of the streamlet. In this example the streamlet
// has 1 inlet and 1 outlet
val shape = StreamletShape(in, out)
// Step 3: Provide custom implementation of `FlinkStreamletLogic` that defines
// the behavior of the streamlet
override def createLogic() = new FlinkStreamletLogic {
override def buildExecutionGraph = {
val ins: DataStream[Data] = readStream(in)
val outs: DataStream[Data] = ins.filter(_.id % 2 == 0)
writeStream(out, outs)
}
}
}
The unit test
Here is a list of imports needed for writing the test suite.
import scala.collection.immutable.Seq
import org.apache.flink.streaming.api.scala._
import cloudflow.flink.testkit._
import org.scalatest._
Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.
// 1. Extend from the abstract class FlinkTestkit
class FlinkProcessorSpec extends FlinkTestkit
with WordSpecLike
with Matchers
with BeforeAndAfterAll {
"FlinkProcessor" should {
"process streaming data" in {
@transient lazy val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. Create the FlinkStreamlet to test
val processor = new FlinkProcessor
// 3. Prepare data to be pushed into inlet ports
val data = (1 to 10).map(i ⇒ new Data(i, s"name$i"))
// 4. Setup inlet taps that tap the inlet ports of the streamlet
val in: FlinkInletTap[Data] = inletAsTap[Data](
processor.in,
env.addSource(FlinkSource.CollectionSourceFunction(data)))
// 5. Setup outlet taps for outlet ports
val out: FlinkOutletTap[Data] = outletAsTap[Data](processor.out)
// 6. Run the streamlet using the `run` method that the testkit offers
run(processor, Seq(in), Seq(out), env)
// 7. Write assertions to ensure that the expected results match the actual ones
TestFlinkStreamletContext.result should contain((Data(2, "name2")).toString())
TestFlinkStreamletContext.result.size should equal(5)
}
}
}
Basic flow of testkit
APIs in Java
Here’s the basic flow that you need to follow in Java when writing tests using the testkit
:
-
Extend the test class with the
JUnitSuite
trait from ScalaTest. -
Instantiate the testkit class
-
Create the Flink streamlet that needs to be tested
-
Prepare data to be pushed into inlet ports
-
Setup inlet taps that tap the inlet ports of the streamlet
-
Setup outlet taps for outlet ports
-
Run the streamlet using the
run
method that the testkit offers -
Write assertions to ensure that the expected results match the actual ones
Details of the workflow in Java
Let’s consider a FlinkStreamlet
class that reads data from an inlet, process that data, and writes it to an outlet. To write unit tests for this class, we will follow the steps that we outlined in the last section, using ScalaTest as the testing library.
We will discuss the steps for implementation in both Scala and Java.
Setting up a sample FlinkStreamlet
FlinkStreamlet
is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Flink streamlet, please refer to [_building_a_flink_streamlet]. Here’s a sample FlinkStreamlet
that we would like to write unit tests for.
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.*;
import cloudflow.flink.*;
public class FlinkProcessor extends FlinkStreamlet {
// Step 1: Define inlets and outlets. Note for the outlet you need to specify
// the partitioner function explicitly or else RoundRobinPartitioner will
// be used : using `name` as the partitioner here
AvroInlet<Data> in = AvroInlet.<Data>create("in", Data.class);
AvroOutlet<Data> out = AvroOutlet.<Data>create("out", (Data d) -> d.getId().toString(), Data.class);
// Step 2: Define the shape of the streamlet. In this example the streamlet
// has 1 inlet and 1 outlet
@Override public StreamletShape shape() {
return StreamletShape.createWithInlets(in).withOutlets(out);
}
// Step 3: Provide custom implementation of `FlinkStreamletLogic` that defines
// the behavior of the streamlet
@Override public FlinkStreamletLogic createLogic() {
return new FlinkStreamletLogic(getContext()) {
@Override public void buildExecutionGraph() {
DataStream<Data> ins =
this.<Data>readStream(in, Data.class)
.map((Data d) -> d)
.returns(new TypeHint<Data>(){}.getTypeInfo());
DataStream<Data> simples = ins.filter((Data d) -> d.getId() % 2 == 0);
DataStreamSink<Data> sink = writeStream(out, simples, Data.class);
}
};
}
}
The unit test
Here is a list of imports needed for writing the test suite.
import org.junit.*;
import static org.junit.Assert.*;
import junit.framework.TestCase;
import org.scalatestplus.junit.JUnitSuite;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import cloudflow.flink.testkit.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import cloudflow.flink.*;
Here’s how we would write a unit test using ScalaTest. The logical steps of the test are annotated with inline comments explaining their rationale.
// 1. Extend from the abstract class JUnitSuite
public class FlinkProcessorTest extends JUnitSuite {
@Test
public void shouldProcessDataWhenItIsRun() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Instantiate the testkit class FlinkTestkit
FlinkTestkit testkit = new FlinkTestkit() {};
// 3. Create the FlinkStreamlet to test
FlinkProcessor streamlet = new FlinkProcessor();
// 4. Prepare data to be pushed into inlet ports
List<Integer> range = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
List<Data> data = range.stream().map((Integer i) -> new Data(i, "name" + i.toString())).collect(Collectors.toList());
// 5. Setup inlet taps that tap the inlet ports of the streamlet
FlinkInletTap<Data> in = testkit.<Data>getInletAsTap(streamlet.in,
env.<Data>addSource(
FlinkSource.<Data>collectionSourceFunction(data),
TypeInformation.<Data>of(Data.class)
),
Data.class
);
// 6. Setup outlet taps for outlet ports
FlinkOutletTap<Data> out = testkit.getOutletAsTap(streamlet.out, Data.class);
// 7. Run the streamlet using the `run` method that the testkit offers
testkit.run(streamlet, Collections.singletonList(in), Collections.singletonList(out), env);
// 8. Write assertions to ensure that the expected results match the actual ones
assertTrue(TestFlinkStreamletContext.result().contains((new Data(2 ,"name2").toString())));
assertEquals(TestFlinkStreamletContext.result().size(), 5);
}
}