c

cloudflow.spark.testkit

SparkStreamletTestkit

final case class SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds) extends Product with Serializable

Testkit for testing Spark streamlets.

The steps to write a test using the testkit are:

  1. Create the test class and extend it with SparkScalaTestSupport
  2. Create the Spark streamlet testkit instance
  3. Create the Spark streamlet
  4. Setup inlet tap on inlet port
  5. Setup outlet tap on outlet port
  6. Build input data and send to inlet tap
  7. Run the test
  8. Get data from outlet and assert
// 1. Create the test class and extend it with `SparkScalaTestSupport`
class MySparkStreamletSpec extends SparkScalaTestSupport {

 "SparkProcessor" should {
   "process streaming data" in {

     // 2. Create Spark streamlet testkit instance
     val testKit = SparkStreamletTestkit(session)

     // 3. Create spark streamlet
     val processor = new SparkProcessor[Data, Simple] {
       override def createLogic(): ProcessorLogic[Data, Simple] = new ProcessorLogic[Data, Simple](OutputMode.Append) {
         override def process(inDataset: Dataset[Data]): Dataset[Simple] =
           inDataset.select($"name").as[Simple]
       }
     }

     // 4. Setup inlet(s) tap on inlet port(s)
     val in: SparkInletTap[Data] = inletAsTap[Data](processor.shape.inlet)

     // 5. Setup outlet tap(s) on outlet port(s)
     val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.shape.outlet)

     // 6. Prepare input data and send it to the inlet tap(s)
     val data = (1 to 10).map(i => Data(i, s"name$i"))
     in.addData(data)

     // 7. Run the test
     run(processor, Seq(in), Seq(out))

     // 8. Get data from outlet tap(s) and assert
     val results = out.asCollection(session)

     results should contain(Simple("name1"))
   }
 }
}

Note: Every test is executed against a SparkSession which gets created and removed as part of the test lifecycle methods.

Annotations
@deprecated
Deprecated

(Since version 2.2.0) Use contrib-sbt-spark library instead, see https://github.com/lightbend/cloudflow-contrib

Linear Supertypes
Serializable, Serializable, Product, Equals, AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. SparkStreamletTestkit
  2. Serializable
  3. Serializable
  4. Product
  5. Equals
  6. AnyRef
  7. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds)

Type Members

  1. implicit class InletTapOps[T] extends AnyRef
  2. implicit class OutletTapOps[T] extends AnyRef

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. val TestStreamletName: String
  5. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  6. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  7. val config: Config
  8. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  9. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  10. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  11. def inletAsTap[In](in: CodecInlet[In])(implicit arg0: Encoder[In]): SparkInletTap[In]
  12. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  13. val maxDuration: Duration
  14. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  15. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  16. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  17. def outletAsTap[Out](out: CodecOutlet[Out])(implicit arg0: Encoder[Out]): SparkOutletTap[Out]
  18. def run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport

    Runs the sparkStreamlet using inletTap as the source and outletTaps as the sinks.

    Runs the sparkStreamlet using inletTap as the source and outletTaps as the sinks. Each inletTap abstracts a MemoryStream and an inlet port, where the test input data gets added. The outletTap returns a port and a query name, which gives a handle to the Spark StreamingQuery name that gets executed.

    sparkStreamlet

    the Sparklet to run

    inletTap

    the inlet stream and port

    outletTaps

    the collection of outlet query names and ports

    returns

    Unit

  19. def run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTap: SparkOutletTap[_]): ExecutionReport

    Runs the sparkStreamlet using inletTaps as the sources and outletTap as the sink.

    Runs the sparkStreamlet using inletTaps as the sources and outletTap as the sink. Each inletTap abstracts a MemoryStream and an inlet port, where the test input data gets added. The outletTap returns a port and a query name, which gives a handle to the Spark StreamingQuery name that gets executed.

    sparkStreamlet

    the Sparklet to run

    inletTaps

    the collection of inlet streams and ports

    outletTap

    the outlet query and port

    returns

    Unit

  20. def run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTap: SparkOutletTap[_]): ExecutionReport

    Runs the sparkStreamlet using inletTap as the source and outletTap as the sink.

    Runs the sparkStreamlet using inletTap as the source and outletTap as the sink. Each inletTap abstracts a MemoryStream and an inlet port, where the test input data gets added. The outletTap returns a port and a query name, which gives a handle to the Spark StreamingQuery name that gets executed.

    sparkStreamlet

    the Sparklet to run

    inletTap

    the inlet stream and port

    outletTap

    the outlet query and port

    returns

    Unit

  21. def run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport

    Runs the sparkStreamlet using inletTaps as the sources and outletTaps as the sinks.

    Runs the sparkStreamlet using inletTaps as the sources and outletTaps as the sinks. Each inletTap abstracts a MemoryStream and an inlet port, where the test input data gets added. The outletTap returns a port and a query name, which gives a handle to the Spark StreamingQuery name that gets executed.

    sparkStreamlet

    the Sparklet to run

    inletTaps

    the collection of inlet streams and ports

    outletTaps

    the collection of outlet query names and ports

    returns

    Unit

  22. val session: SparkSession
  23. implicit lazy val sqlCtx: SQLContext
  24. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  25. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  26. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  27. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  28. def withConfigParameterValues(configParameterValues: ConfigParameterValue*): SparkStreamletTestkit

    Adding configuration parameters and their values to the configuration used in the test.

    Adding configuration parameters and their values to the configuration used in the test.

    ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.

    Annotations
    @varargs()

Inherited from Serializable

Inherited from Serializable

Inherited from Product

Inherited from Equals

Inherited from AnyRef

Inherited from Any

Ungrouped