Get monitoring, observability, online education,
and expert support from Lightbend.
Learn More

Testing a Spark Streamlet

Integration with the Spark Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Spark integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Spark Native Kubernetes integration. The documentation that follows describes the deprecated feature. The SparkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeSparkPlugin, which is described in the Cloudflow contrib documentation for building Spark native streamlets.

A testkit is provided to make it easier to write unit tests for Spark streamlets. The unit tests are meant to facilitate local testing of streamlets.

Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Extend the test class with the SparkScalaTestSupport trait. This trait provides the basic functionalities of managing the SparkSession, basic initialization and cleanups and the core APIs of the testkit.

  2. Create a Spark streamlet testkit instance.

  3. Create the Spark streamlet that needs to be tested.

  4. Setup inlet taps that tap the inlet ports of the streamlet.

  5. Setup outlet taps for outlet ports.

  6. Push data into inlet ports.

  7. Run the streamlet using the testkit and the setup inlet taps and outlet taps.

  8. Write assertions to ensure that the expected results match the actual ones.

Details of the workflow

Let’s consider an example where we would like to write unit tests for testing a SparkStreamlet that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.

Setting up a sample SparkStreamlet

Here is a list of imports needed for writing the test suite.

import scala.collection.immutable.Seq

import cloudflow.spark.testkit._
import cloudflow.spark.sql.SQLImplicits._

SparkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Spark streamlet, please refer to Building a Spark streamlet.

// create Spark Streamlet
class SparkProcessor extends SparkStreamlet {
  val in    = AvroInlet[Data]("in")
  val out   = AvroOutlet[Data]("out", _.id.toString)
  val shape = StreamletShape(in, out)

  override def createLogic() = new SparkStreamletLogic {
    override def buildStreamingQueries = {
      val dataset   = readStream(in)
      val outStream = dataset.filter(_.id % 2 == 0)
      val query     = writeStream(outStream, out, OutputMode.Append)
      query.toQueryExecution
    }
  }
}

The unit test

Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class SparkProcessorSpec extends SparkScalaTestSupport { // 1. Extend SparkScalaTestSupport

  "SparkProcessor" should {

    // 2. Initialize the testkit
    val testkit = SparkStreamletTestkit(session)

    "process streaming data" in {

      // 3. create Spark streamlet
      val processor = new SparkProcessor()

      // 4. setup inlet tap on inlet port
      val in: SparkInletTap[Data] = testkit.inletAsTap[Data](processor.in)

      // 5. setup outlet tap on outlet port
      val out: SparkOutletTap[Data] = testkit.outletAsTap[Data](processor.out)

      // 6. build data and send to inlet tap
      val data = (1 to 10).map(i ⇒ Data(i, s"name$i"))
      in.addData(data)

      // 7. Run the streamlet using the testkit and the setup inlet taps and outlet probes
      val run = testkit.run(processor, Seq(in), Seq(out))

      // get data from outlet tap
      val results = out.asCollection(session)

      // 8. Assert that actual matches expectation
      results must contain(Data(2, "name2"))
      results.size must be(5)
      run.totalRows must be(10)
    }
  }
}

The SparkScalaTestSupport trait

This provides session management and needs to be mixed in with the main test class. This trait provides the following functionalities:

  1. Manage a SparkSession for all tests, initialized when the test class initialize.

  2. Cleanup the session using afterAll. If you want custom logic for cleanups, override the afterAll method and call super.afterAll() before adding your custom logic.

The SparkStreamletTestkit class

  1. Provide core APIs like inletAsTap, outletAsTap, run.

  2. Support for adding values for configuration parameters.