Packages

package testkit

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. All

Type Members

  1. final case class ConfigParameterValue extends Product with Serializable
  2. case class ExecutionReport(totalRows: Long, totalQueries: Int, failures: Seq[String]) extends Product with Serializable
  3. class QueryExecutionMonitor extends StreamingQueryListener
  4. case class SparkInletTap[T](portName: String, instream: MemoryStream[T])(implicit evidence$5: Encoder[T]) extends Product with Serializable
  5. case class SparkOutletTap[T](portName: String, queryName: String)(implicit evidence$6: Encoder[T]) extends Product with Serializable
  6. trait SparkScalaTestSupport extends AnyWordSpec with Matchers with BeforeAndAfterAll
  7. final case class SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds) extends Product with Serializable

    Testkit for testing Spark streamlets.

    Testkit for testing Spark streamlets.

    The steps to write a test using the testkit are:

    1. Create the test class and extend it with SparkScalaTestSupport
    2. Create the Spark streamlet testkit instance
    3. Create the Spark streamlet
    4. Setup inlet tap on inlet port
    5. Setup outlet tap on outlet port
    6. Build input data and send to inlet tap
    7. Run the test
    8. Get data from outlet and assert
    // 1. Create the test class and extend it with `SparkScalaTestSupport`
    class MySparkStreamletSpec extends SparkScalaTestSupport {
    
     "SparkProcessor" should {
       "process streaming data" in {
    
         // 2. Create Spark streamlet testkit instance
         val testKit = SparkStreamletTestkit(session)
    
         // 3. Create spark streamlet
         val processor = new SparkProcessor[Data, Simple] {
           override def createLogic(): ProcessorLogic[Data, Simple] = new ProcessorLogic[Data, Simple](OutputMode.Append) {
             override def process(inDataset: Dataset[Data]): Dataset[Simple] =
               inDataset.select($"name").as[Simple]
           }
         }
    
         // 4. Setup inlet(s) tap on inlet port(s)
         val in: SparkInletTap[Data] = inletAsTap[Data](processor.shape.inlet)
    
         // 5. Setup outlet tap(s) on outlet port(s)
         val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.shape.outlet)
    
         // 6. Prepare input data and send it to the inlet tap(s)
         val data = (1 to 10).map(i => Data(i, s"name$i"))
         in.addData(data)
    
         // 7. Run the test
         run(processor, Seq(in), Seq(out))
    
         // 8. Get data from outlet tap(s) and assert
         val results = out.asCollection(session)
    
         results should contain(Simple("name1"))
       }
     }
    }

    Note: Every test is executed against a SparkSession which gets created and removed as part of the test lifecycle methods.

  8. case class TestContextException(portName: String, msg: String) extends RuntimeException with Product with Serializable
  9. class TestSparkStreamletContext extends SparkStreamletContext

    An implementation of SparkCtx for unit testing.

    An implementation of SparkCtx for unit testing.

    readStream reads from a streaming data source (a csv in this case) and prepares a Dataset[In]

    writeStream returns a StreamingQuery that pushes the input Dataset[Out] to a MemorySink.

Value Members

  1. object ConfigParameterValue extends Serializable

Ungrouped