Testing an Akka Streamlet
A testkit
is provided to make it easier to write unit tests for Akka Stream streamlets. The unit tests are meant to facilitate local testing of streamlets. The testkit
allows writing of tests in Scala, as well as Java.
Basic flow of testkit
APIs
Here’s the basic flow that you need to follow when writing tests using the testkit
:
-
Instantiate the
testkit
-
Setup inlet taps that tap the inlet ports of the streamlet
-
Setup outlet taps for outlet ports
-
Push data into inlet ports
-
Run the streamlet using the
testkit
and the setup inlet taps and outlet taps -
Use the probes of the outlet taps to a verify expected results
The testkit
connects taps to inlets and outlets of a streamlet. The taps provide means to the tester to write data to the inlets and read data from outlets, making it possible to assert that the streamlet behaves as expected.
Using the testkit
from Scala
Let’s consider an example where we would like to write unit tests for testing a FlowProcessor
. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing framework.
Imports
Here are the imports that we need for writing the tests. These include some obligatory inputs for ScalaTest and test kits for Cloudflow and Akka, and the code that is generated from the Avro schemas.
import akka.actor._
import akka.stream.scaladsl._
import akka.testkit._
import org.scalatest._
import org.scalatest.wordspec._
import org.scalatest.matchers.must._
import cloudflow.akkastream.testkit.scaladsl._
Setting up a sample TestProcessor
Let’s set up a TestProcessor
streamlet that we would like to test. It’s a simple one that filters events based on whether they contain keywords.
class TestProcessor extends AkkaStreamlet {
val in = AvroInlet[Data]("in")
val out = AvroOutlet[Data]("out", _.id.toString)
final override val shape = StreamletShape.withInlets(in).withOutlets(out)
val flow = Flow[Data].filter(_.id % 2 == 0)
override final def createLogic = new RunnableGraphStreamletLogic() {
def runnableGraph = plainSource(in).via(flow).to(plainSink(out))
}
}
The unit test
Here’s how we would write a unit test using AkkaStreamletTestkit
. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.
class TestProcessorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
private implicit val system = ActorSystem("AkkaStreamletSpec")
override def afterAll: Unit =
TestKit.shutdownActorSystem(system)
"A TestProcessor" should {
val testkit = AkkaStreamletTestKit(system)
"Allow for creating a 'flow processor'" in {
val data = Vector(Data(1, "a"), Data(2, "b"), Data(3, "c"))
val expectedData = Vector(Data(2, "b"))
val source = Source(data)
val proc = new TestProcessor
val in = testkit.inletFromSource(proc.in, source)
val out = testkit.outletAsTap(proc.out)
testkit.run(proc, in, out, () ⇒ out.probe.receiveN(1) mustBe expectedData.map(d ⇒ proc.out.partitioner(d) -> d))
out.probe.expectMsg(Completed)
}
}
}
Initialization and cleanups
As per ScalaTest guidelines, we can do custom cleanups in methods like afterAll()
and after()
depending on your requirements. In the current implementation we shutdown the actor system in afterAll()
:
override def afterAll: Unit =
TestKit.shutdownActorSystem(system)
Similarly you can have beforeAll()
and before()
for custom initializations.
If you have a number of tests that work based on similar initializations and cleanups you can also have a common base trait from which the test trait can extend.
Using the testkit
from Java
Using from Java is almost the same as from Scala, the only difference being that you need to use idiomatic Java abstractions and frameworks for writing the tests. In this example we will write the test for the same TestProcessor
streamlet using the Java DSL of the toolkit in JUnit 4.
Imports
Here are the imports that we need for writing the tests. These include some obligatory inputs for JUnit and test kits for Cloudflow and Akka, and the code that is generated from the Avro schemas.
import org.junit.*;
import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.*;
import cloudflow.streamlets.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.testkit.javadsl.*;
import scala.concurrent.duration.Duration;
import scala.compat.java8.FutureConverters;
Setting up a sample TestProcessor
Let’s set up a TestProcessor
that we would like to test.
class TestProcessor extends AkkaStreamlet {
AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> Integer.toString(d.getId()), Data.class);
public StreamletShape shape() {
return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
}
public AkkaStreamletLogic createLogic() {
return new RunnableGraphStreamletLogic(getContext()) {
public RunnableGraph<NotUsed> createRunnableGraph() {
return getPlainSource(inlet)
.via(Flow.<Data>create().filter(d -> d.getId() % 2 == 0))
.to(getPlainSink(outlet));
}
};
}
}
The unit test
Here’s how we would write a unit test using JUnit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.
@Test
public void testFlowProcessor() {
TestProcessor sfp = new TestProcessor();
// 1. instantiate the testkit
AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system);
// 2. Setup inlet taps that tap the inlet ports of the streamlet
QueueInletTap<Data> in = testkit.makeInletAsTap(sfp.inlet);
// 3. Setup outlet probes for outlet ports
ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.outlet);
// 4. Push data into inlet ports
in.queue().offer(new Data(1, "a"));
in.queue().offer(new Data(2, "b"));
// 5. Run the streamlet using the testkit and the setup inlet taps and outlet probes
testkit.<Data>run(sfp, in, out, () -> {
// 6. Assert
out.probe().expectMsg(new Pair<String, Data>("a", new Data(1, "a")));
return out.probe().expectMsg(new Pair<String, Data>("b", new Data(2, "b")));
});
// 6. Assert
out.probe().expectMsg(Completed.completed());
}
Initialization and Cleanups
As per JUnit guidelines, we can do custom initializations and cleanups in methods like setup()
and tearDown()
respectively depending on your requirements. One common practice is to set up a base class that does all common initializations and clean ups for your tests.
class TestProcessorTest {
static ActorMaterializer mat;
static ActorSystem system;
@BeforeClass
public static void setUp() throws Exception {
system = ActorSystem.create();
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() throws Exception {
TestKit.shutdownActorSystem(system, Duration.create(10, "seconds"), false);
system = null;
}
}