Testing an Akka Streamlet

A testkit is provided to make it easier to write unit tests for Akka Stream streamlets. The unit tests are meant to facilitate local testing of streamlets. The testkit allows writing of tests in Scala, as well as Java.

Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Instantiate the testkit

  2. Setup inlet taps that tap the inlet ports of the streamlet

  3. Setup outlet taps for outlet ports

  4. Push data into inlet ports

  5. Run the streamlet using the testkit and the setup inlet taps and outlet taps

  6. Use the probes of the outlet taps to a verify expected results

The testkit connects taps to inlets and outlets of a streamlet. The taps provide means to the tester to write data to the inlets and read data from outlets, making it possible to assert that the streamlet behaves as expected.

Using the testkit from Scala

Let’s consider an example where we would like to write unit tests for testing a FlowProcessor. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing framework.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for ScalaTest and test kits for Cloudflow and Akka, and the code that is generated from the Avro schemas.

import akka.actor._
import akka.stream.scaladsl._
import akka.testkit._

import org.scalatest._
import org.scalatest.wordspec._
import org.scalatest.matchers.must._

import cloudflow.akkastream.testkit.scaladsl._

Setting up a sample TestProcessor

Let’s set up a TestProcessor streamlet that we would like to test. It’s a simple one that filters events based on whether they contain keywords.

class TestProcessor extends AkkaStreamlet {
  val in                   = AvroInlet[Data]("in")
  val out                  = AvroOutlet[Data]("out", _.id.toString)
  final override val shape = StreamletShape.withInlets(in).withOutlets(out)

  val flow = Flow[Data].filter(_.id % 2 == 0)
  override final def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = plainSource(in).via(flow).to(plainSink(out))
  }
}

The unit test

Here’s how we would write a unit test using AkkaStreamletTestkit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class TestProcessorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {

  private implicit val system = ActorSystem("AkkaStreamletSpec")

  override def afterAll: Unit =
    TestKit.shutdownActorSystem(system)

  "A TestProcessor" should {

    val testkit = AkkaStreamletTestKit(system)

    "Allow for creating a 'flow processor'" in {
      val data         = Vector(Data(1, "a"), Data(2, "b"), Data(3, "c"))
      val expectedData = Vector(Data(2, "b"))
      val source       = Source(data)
      val proc         = new TestProcessor
      val in           = testkit.inletFromSource(proc.in, source)
      val out          = testkit.outletAsTap(proc.out)

      testkit.run(proc, in, out, () ⇒ out.probe.receiveN(1) mustBe expectedData.map(d ⇒ proc.out.partitioner(d) -> d))

      out.probe.expectMsg(Completed)
    }
  }
}

Initialization and cleanups

As per ScalaTest guidelines, we can do custom cleanups in methods like afterAll() and after() depending on your requirements. In the current implementation we shutdown the actor system in afterAll():

  override def afterAll: Unit =
    TestKit.shutdownActorSystem(system)

Similarly you can have beforeAll() and before() for custom initializations.

If you have a number of tests that work based on similar initializations and cleanups you can also have a common base trait from which the test trait can extend.

Using the testkit from Java

Using from Java is almost the same as from Scala, the only difference being that you need to use idiomatic Java abstractions and frameworks for writing the tests. In this example we will write the test for the same TestProcessor streamlet using the Java DSL of the toolkit in JUnit 4.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for JUnit and test kits for Cloudflow and Akka, and the code that is generated from the Avro schemas.

import org.junit.*;

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.*;

import cloudflow.streamlets.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;

import cloudflow.akkastream.testkit.javadsl.*;

import scala.concurrent.duration.Duration;
import scala.compat.java8.FutureConverters;

Setting up a sample TestProcessor

Let’s set up a TestProcessor that we would like to test.

class TestProcessor extends AkkaStreamlet {
  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> Integer.toString(d.getId()), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph<NotUsed> createRunnableGraph() {
        return getPlainSource(inlet)
          .via(Flow.<Data>create().filter(d -> d.getId() % 2 == 0))
          .to(getPlainSink(outlet));
      }
    };
  }
}

The unit test

Here’s how we would write a unit test using JUnit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

  @Test
  public void testFlowProcessor() {
    TestProcessor sfp = new TestProcessor();

    // 1. instantiate the testkit
    AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system);

    // 2. Setup inlet taps that tap the inlet ports of the streamlet
    QueueInletTap<Data> in = testkit.makeInletAsTap(sfp.inlet);

    // 3. Setup outlet probes for outlet ports
    ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.outlet);

    // 4. Push data into inlet ports
    in.queue().offer(new Data(1, "a"));
    in.queue().offer(new Data(2, "b"));

    // 5. Run the streamlet using the testkit and the setup inlet taps and outlet probes
    testkit.<Data>run(sfp, in, out, () -> {
      // 6. Assert
      out.probe().expectMsg(new Pair<String, Data>("a", new Data(1, "a")));
      return out.probe().expectMsg(new Pair<String, Data>("b", new Data(2, "b")));
    });

    // 6. Assert
    out.probe().expectMsg(Completed.completed());
  }

Initialization and Cleanups

As per JUnit guidelines, we can do custom initializations and cleanups in methods like setup() and tearDown() respectively depending on your requirements. One common practice is to set up a base class that does all common initializations and clean ups for your tests.

class TestProcessorTest {

  static ActorMaterializer mat;
  static ActorSystem system;

  @BeforeClass
  public static void setUp() throws Exception {
    system = ActorSystem.create();
    mat = ActorMaterializer.create(system);
  }

  @AfterClass
  public static void tearDown() throws Exception {
    TestKit.shutdownActorSystem(system, Duration.create(10, "seconds"), false);
    system = null;
  }
}