Testing a Spark Streamlet
A testkit
is provided to make it easier to write unit tests for Spark streamlets. The unit tests are meant to facilitate local testing of streamlets.
Basic flow of testkit
APIs
Here’s the basic flow that you need to follow when writing tests using the testkit
:
-
Extend the test class with the
SparkScalaTestSupport
trait. This trait provides the basic functionalities of managing theSparkSession
, basic initialization and cleanups and the core APIs of thetestkit
. -
Create a Spark streamlet
testkit
instance. -
Create the Spark streamlet that needs to be tested.
-
Setup inlet taps that tap the inlet ports of the streamlet.
-
Setup outlet taps for outlet ports.
-
Push data into inlet ports.
-
Run the streamlet using the
testkit
and the setup inlet taps and outlet taps. -
Write assertions to ensure that the expected results match the actual ones.
Details of the workflow
Let’s consider an example where we would like to write unit tests for testing a SparkStreamlet
that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.
Setting up a sample SparkStreamlet
Here is a list of imports needed for writing the test suite.
import scala.collection.immutable.Seq
import cloudflow.spark.testkit._
import cloudflow.spark.sql.SQLImplicits._
SparkStreamlet
is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Spark streamlet, please refer to Building a Spark streamlet.
// create Spark Streamlet
class SparkProcessor extends SparkStreamlet {
val in = AvroInlet[Data]("in")
val out = AvroOutlet[Data]("out", _.id.toString)
val shape = StreamletShape(in, out)
override def createLogic() = new SparkStreamletLogic {
override def buildStreamingQueries = {
val dataset = readStream(in)
val outStream = dataset.filter(_.id % 2 == 0)
val query = writeStream(outStream, out, OutputMode.Append)
query.toQueryExecution
}
}
}
The unit test
Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.
class SparkProcessorSpec extends SparkScalaTestSupport { // 1. Extend SparkScalaTestSupport
"SparkProcessor" should {
// 2. Initialize the testkit
val testkit = SparkStreamletTestkit(session)
"process streaming data" in {
// 3. create Spark streamlet
val processor = new SparkProcessor()
// 4. setup inlet tap on inlet port
val in: SparkInletTap[Data] = testkit.inletAsTap[Data](processor.in)
// 5. setup outlet tap on outlet port
val out: SparkOutletTap[Data] = testkit.outletAsTap[Data](processor.out)
// 6. build data and send to inlet tap
val data = (1 to 10).map(i ⇒ Data(i, s"name$i"))
in.addData(data)
// 7. Run the streamlet using the testkit and the setup inlet taps and outlet probes
val run = testkit.run(processor, Seq(in), Seq(out))
// get data from outlet tap
val results = out.asCollection(session)
// 8. Assert that actual matches expectation
results must contain(Data(2, "name2"))
results.size must be(5)
run.totalRows must be (10)
}
}
}
The SparkScalaTestSupport trait
This provides session management and needs to be mixed in with the main test class. This trait provides the following functionalities:
-
Manage a
SparkSession
for all tests, initialized when the test class initialize. -
Cleanup the session using
afterAll
. If you want custom logic for cleanups, override theafterAll
method and callsuper.afterAll()
before adding your custom logic.