Packages

final case class AkkaStreamletTestKit extends BaseAkkaStreamletTestKit[AkkaStreamletTestKit] with Product with Serializable

Testkit for testing akka streamlets.

API:

// instantiate the testkit
val testkit = AkkaStreamletTestKit(system)

// setup inlet and outlet
val in = testkit.inletAsQueue(SimpleFlowProcessor.shape.inlet)
val out = testkit.outletAsProbe(SimpleFlowProcessor.shape.outlet)

// put data
in.queue.offer(Data(1, "a"))
in.queue.offer(Data(2, "b"))

// run the testkit
testkit.run(SimpleFlowProcessor, in, out, () => {
  out.probe.expectMsg(("2", Data(2, "b")))
})

The following point is from akka.testkit.Testkit and is valid mostly for this testkit as well:

Beware of two points:

  • the ActorSystem passed into the constructor needs to be shutdown, otherwise thread pools and memory will be leaked
  • this class is not thread-safe (only one actor with one queue, one stack of within blocks); it is expected that the code is executed from a constructor as shown above, which makes this a non-issue, otherwise take care not to run tests within a single test class instance in parallel.

It should be noted that for CI servers and the like all maximum Durations are scaled using their Duration.dilated method, which uses the TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".

Linear Supertypes
Serializable, Serializable, Product, Equals, BaseAkkaStreamletTestKit[AkkaStreamletTestKit], AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. AkkaStreamletTestKit
  2. Serializable
  3. Serializable
  4. Product
  5. Equals
  6. BaseAkkaStreamletTestKit
  7. AnyRef
  8. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  6. val config: Config
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  7. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  8. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  9. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  10. def inletAsTap[T](inlet: CodecInlet[T]): QueueInletTap[T]

  11. def inletFromSource[T](inlet: CodecInlet[T], source: Source[T, NotUsed]): SourceInletTap[T]

  12. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  13. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  14. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  15. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  16. def outletAsTap[T](outlet: CodecOutlet[T]): ProbeOutletTap[T]

    Creates an outlet tap.

    Creates an outlet tap. An outlet tap provides a probe that can be used to assert elements produced to the specified outlet.

    The data being written to the outlet will always be partitioned using the partitioner function of the outlet. This means that assertions should always expect a Scala tuple with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.

    Example (see the full example above, on the class level:

    val testkit = AkkaStreamletTestKit(system)
    val out = testkit.outletAsProbe(SimpleFlowProcessor.shape.outlet)
    
    ...
    
    testkit.run(SimpleFlowProcessor, in, out, () => {
      out.probe.expectMsg(("2", Data(2, "b")))
    })
  17. def outletToSink[T](outlet: CodecOutlet[T], sink: Sink[(String, T), NotUsed]): SinkOutletTap[T]

    Attaches the provided Sink to the specified outlet.

    Attaches the provided Sink to the specified outlet.

    The data being written to the Sink will always be partitioned using the partitioner function of the outlet. This means that the Sink should expect Scala tuples, with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.

    This method can be used to for instance quickly collect all output produced into a simple sequence using Sink.seq[T].

  18. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]]): StreamletExecution

    This method is used when testkit.run and StreamletExecution#stop has to be done under different control flows.

    This method is used when testkit.run and StreamletExecution#stop has to be done under different control flows.

    Definition Classes
    BaseAkkaStreamletTestKit
  19. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using an ip as the source and a list of op as the sink.

    Runs the streamlet using an ip as the source and a list of op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  20. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of ip as the source and an op as the sink.

    Runs the streamlet using a list of ip as the source and an op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  21. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of ip as the source and a list of op as the sink.

    Runs the streamlet using a list of ip as the source and a list of op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  22. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using ip as the source and an empty sink.

    Runs the streamlet using ip as the source and an empty sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  23. def run[T](streamlet: AkkaStreamlet, op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using an empty source and op as the sink.

    Runs the streamlet using an empty source and op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  24. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using ip as the source and op as the sink.

    Runs the streamlet using ip as the source and op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  25. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  26. val system: ActorSystem
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  27. val volumeMounts: List[VolumeMount]
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  28. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  29. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  30. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  31. def withConfig(c: Config): AkkaStreamletTestKit

    Returns an instance of this Testkit with the specified configuration loaded into the Akka ActorSystem.

    Returns an instance of this Testkit with the specified configuration loaded into the Akka ActorSystem.

    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  32. def withConfigParameterValues(configParameterValues: ConfigParameterValue*): AkkaStreamletTestKit

    Adds configuration parameters and their values to the configuration used in the test.

    Adds configuration parameters and their values to the configuration used in the test.

    ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.

    Definition Classes
    BaseAkkaStreamletTestKit
    Annotations
    @varargs()
  33. def withVolumeMounts(volumeMount: VolumeMount, volumeMounts: VolumeMount*): AkkaStreamletTestKit

    Returns an instance of this TestKit with the specified VolumeMount available to the streamlets

    Returns an instance of this TestKit with the specified VolumeMount available to the streamlets

    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit

Inherited from Serializable

Inherited from Serializable

Inherited from Product

Inherited from Equals

Inherited from BaseAkkaStreamletTestKit[AkkaStreamletTestKit]

Inherited from AnyRef

Inherited from Any

Ungrouped