Packages

final case class AkkaStreamletTestKit extends BaseAkkaStreamletTestKit[AkkaStreamletTestKit] with Product with Serializable

Java testkit for testing akka streamlets.

API:

// instantiate the testkit
AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system);

// setup inlet and outlet
SimpleFlowProcessor sfp = new SimpleFlowProcessor();

QueueInletTap<Data> in = testkit.makeInletAsTap(sfp.shape().inlet());
ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.shape().outlet());

// put data
in.<Data>queue().offer(new Data(1, "a"));
in.<Data>queue().offer(new Data(2, "b"));

// run the testkit
testkit.<Data, scala.Tuple2<String, Data>>run(sfp, in, out, () -> {
  return out.probe().expectMsg(new akka.japi.Pair<String, Data>("2", new Data(2, "b")));
});

The following point is from akka.testkit.Testkit and is valid mostly for this testkit as well:

Beware of two points:

  • the ActorSystem passed into the constructor needs to be shutdown, otherwise thread pools and memory will be leaked
  • this class is not thread-safe (only one actor with one queue, one stack of within blocks); it is expected that the code is executed from a constructor as shown above, which makes this a non-issue, otherwise take care not to run tests within a single test class instance in parallel.

It should be noted that for CI servers and the like all maximum Durations are scaled using their Duration.dilated method, which uses the TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".

Linear Supertypes
Serializable, Serializable, Product, Equals, BaseAkkaStreamletTestKit[AkkaStreamletTestKit], AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. AkkaStreamletTestKit
  2. Serializable
  3. Serializable
  4. Product
  5. Equals
  6. BaseAkkaStreamletTestKit
  7. AnyRef
  8. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  6. val config: Config
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  7. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  8. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  9. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  10. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  11. def makeInletAsTap[T](inlet: CodecInlet[T]): QueueInletTap[T]

  12. def makeInletFromSource[T](inlet: CodecInlet[T], source: Source[T, NotUsed]): SourceInletTap[T]

  13. def makeOutletAsTap[T](outlet: CodecOutlet[T]): ProbeOutletTap[T]

    Creates an outlet tap.

    Creates an outlet tap. An outlet tap provides a probe that can be used to assert elements produced to the specified outlet.

    The data being written to the outlet will always be partitioned using the partitioner function of the outlet. This means that assertions should always expect an instance of akka.japi.Pair with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.

    Example (see the full example above, on the class level:

    AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system);
    SimpleFlowProcessor sfp = new SimpleFlowProcessor();
    ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.shape().outlet());
    
    ...
    
    testkit.<Data, scala.Tuple2<String, Data>>run(sfp, in, out, () -> {
      return out.probe().expectMsg(new akka.japi.Pair<String, Data>("2", new Data(2, "b")));
    });
  14. def makeOutletToSink[T](outlet: CodecOutlet[T], sink: Sink[Pair[String, T], NotUsed]): SinkOutletTap[T]

    Attaches the provided Sink to the specified outlet.

    Attaches the provided Sink to the specified outlet.

    The data being written to the Sink will always be partitioned using the partitioner function of the outlet. This means that the Sink should expect instances of akka.japi.Pair, with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.

    This method can be used to for instance quickly collect all output produced into a simple list using Sink.seq[T].

  15. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  16. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  17. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  18. def run[T](streamlet: AkkaStreamlet, inletTap: InletTap[_], outletTaps: List[OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using an inlettap as the source and a list of outletTaps as the sink.

    Runs the streamlet using an inlettap as the source and a list of outletTaps as the sink. After running the streamlet it also runs the assertions.

  19. def run[T](streamlet: AkkaStreamlet, inletTaps: List[InletTap[_]], outletTap: OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of inletTaps as the source and an outletTap as the sink.

    Runs the streamlet using a list of inletTaps as the source and an outletTap as the sink. After running the streamlet it also runs the assertions.

  20. def run[T](streamlet: AkkaStreamlet, inletTaps: List[InletTap[_]], outletTaps: List[OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of inletTaps as the source and a list of outletTaps as the sink.

    Runs the streamlet using a list of inletTaps as the source and a list of outletTaps as the sink. After running the streamlet it also runs the assertions.

  21. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]]): StreamletExecution

    This method is used when testkit.run and StreamletExecution#stop has to be done under different control flows.

    This method is used when testkit.run and StreamletExecution#stop has to be done under different control flows.

    Definition Classes
    BaseAkkaStreamletTestKit
  22. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using an ip as the source and a list of op as the sink.

    Runs the streamlet using an ip as the source and a list of op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  23. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of ip as the source and an op as the sink.

    Runs the streamlet using a list of ip as the source and an op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  24. def run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit

    Runs the streamlet using a list of ip as the source and a list of op as the sink.

    Runs the streamlet using a list of ip as the source and a list of op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  25. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using ip as the source and an empty sink.

    Runs the streamlet using ip as the source and an empty sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  26. def run[T](streamlet: AkkaStreamlet, op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using an empty source and op as the sink.

    Runs the streamlet using an empty source and op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  27. def run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit

    Runs the streamlet using ip as the source and op as the sink.

    Runs the streamlet using ip as the source and op as the sink. After running the streamlet it also runs the assertions.

    Definition Classes
    BaseAkkaStreamletTestKit
  28. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  29. val system: ActorSystem
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  30. val volumeMounts: List[VolumeMount]
    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  31. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  32. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  33. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  34. def withConfig(c: Config): AkkaStreamletTestKit

    Returns an instance of this Testkit with the specified configuration loaded into the Akka ActorSystem.

    Returns an instance of this Testkit with the specified configuration loaded into the Akka ActorSystem.

    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
  35. def withConfigParameterValues(configParameterValues: ConfigParameterValue*): AkkaStreamletTestKit

    Adds configuration parameters and their values to the configuration used in the test.

    Adds configuration parameters and their values to the configuration used in the test.

    ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.

    Definition Classes
    BaseAkkaStreamletTestKit
    Annotations
    @varargs()
  36. def withVolumeMounts(volumeMount: VolumeMount, volumeMounts: VolumeMount*): AkkaStreamletTestKit

    Returns an instance of this TestKit with the specified VolumeMount available to the streamlets

    Returns an instance of this TestKit with the specified VolumeMount available to the streamlets

    Definition Classes
    AkkaStreamletTestKit → BaseAkkaStreamletTestKit
    Annotations
    @varargs()

Inherited from Serializable

Inherited from Serializable

Inherited from Product

Inherited from Equals

Inherited from BaseAkkaStreamletTestKit[AkkaStreamletTestKit]

Inherited from AnyRef

Inherited from Any

Ungrouped