cloudflow.akkastream.testkit.scaladsl
AkkaStreamletTestKit
Companion object AkkaStreamletTestKit
final case class AkkaStreamletTestKit extends BaseAkkaStreamletTestKit[AkkaStreamletTestKit] with Product with Serializable
Testkit for testing akka streamlets.
API:
// instantiate the testkit val testkit = AkkaStreamletTestKit(system) // setup inlet and outlet val in = testkit.inletAsQueue(SimpleFlowProcessor.shape.inlet) val out = testkit.outletAsProbe(SimpleFlowProcessor.shape.outlet) // put data in.queue.offer(Data(1, "a")) in.queue.offer(Data(2, "b")) // run the testkit testkit.run(SimpleFlowProcessor, in, out, () => { out.probe.expectMsg(("2", Data(2, "b"))) })
The following point is from akka.testkit.Testkit
and is valid mostly for this testkit as well:
Beware of two points:
- the ActorSystem passed into the constructor needs to be shutdown, otherwise thread pools and memory will be leaked
- this class is not thread-safe (only one actor with one queue, one stack
of
within
blocks); it is expected that the code is executed from a constructor as shown above, which makes this a non-issue, otherwise take care not to run tests within a single test class instance in parallel.
It should be noted that for CI servers and the like all maximum Durations are scaled using their Duration.dilated method, which uses the TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".
- Alphabetic
- By Inheritance
- AkkaStreamletTestKit
- Serializable
- Serializable
- Product
- Equals
- BaseAkkaStreamletTestKit
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
val
config: Config
- Definition Classes
- AkkaStreamletTestKit → BaseAkkaStreamletTestKit
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def inletAsTap[T](inlet: CodecInlet[T]): QueueInletTap[T]
- def inletFromSource[T](inlet: CodecInlet[T], source: Source[T, NotUsed]): SourceInletTap[T]
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
def
outletAsTap[T](outlet: CodecOutlet[T]): ProbeOutletTap[T]
Creates an outlet tap.
Creates an outlet tap. An outlet tap provides a probe that can be used to assert elements produced to the specified outlet.
The data being written to the outlet will always be partitioned using the partitioner function of the outlet. This means that assertions should always expect a Scala tuple with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.
Example (see the full example above, on the class level:
val testkit = AkkaStreamletTestKit(system) val out = testkit.outletAsProbe(SimpleFlowProcessor.shape.outlet) ... testkit.run(SimpleFlowProcessor, in, out, () => { out.probe.expectMsg(("2", Data(2, "b"))) })
-
def
outletToSink[T](outlet: CodecOutlet[T], sink: Sink[(String, T), NotUsed]): SinkOutletTap[T]
Attaches the provided Sink to the specified outlet.
Attaches the provided Sink to the specified outlet.
The data being written to the Sink will always be partitioned using the partitioner function of the outlet. This means that the Sink should expect Scala tuples, with the first element being the partitioning key (can be null in case the default RoundRobinPartitioner is used) and the second element being the actual data element.
This method can be used to for instance quickly collect all output produced into a simple sequence using
Sink.seq[T]
. -
def
run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]]): StreamletExecution
This method is used when
testkit.run
andStreamletExecution#stop
has to be done under different control flows.This method is used when
testkit.run
andStreamletExecution#stop
has to be done under different control flows.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit
Runs the
streamlet
using anip
as the source and a list ofop
as the sink.Runs the
streamlet
using anip
as the source and a list ofop
as the sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit
Runs the
streamlet
using a list ofip
as the source and anop
as the sink.Runs the
streamlet
using a list ofip
as the source and anop
as the sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, ip: List[testkit.InletTap[_]], op: List[testkit.OutletTap[_]], assertions: () ⇒ Any): Unit
Runs the
streamlet
using a list ofip
as the source and a list ofop
as the sink.Runs the
streamlet
using a list ofip
as the source and a list ofop
as the sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[T], assertions: () ⇒ Any): Unit
Runs the
streamlet
usingip
as the source and an empty sink.Runs the
streamlet
usingip
as the source and an empty sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit
Runs the
streamlet
using an empty source andop
as the sink.Runs the
streamlet
using an empty source andop
as the sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
def
run[T](streamlet: AkkaStreamlet, ip: testkit.InletTap[_], op: testkit.OutletTap[T], assertions: () ⇒ Any): Unit
Runs the
streamlet
usingip
as the source andop
as the sink.Runs the
streamlet
usingip
as the source andop
as the sink. After running the streamlet it also runs the assertions.- Definition Classes
- BaseAkkaStreamletTestKit
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
val
system: ActorSystem
- Definition Classes
- AkkaStreamletTestKit → BaseAkkaStreamletTestKit
-
val
volumeMounts: List[VolumeMount]
- Definition Classes
- AkkaStreamletTestKit → BaseAkkaStreamletTestKit
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
def
withConfig(c: Config): AkkaStreamletTestKit
Returns an instance of this Testkit with the specified configuration loaded into the Akka
ActorSystem
.Returns an instance of this Testkit with the specified configuration loaded into the Akka
ActorSystem
.- Definition Classes
- AkkaStreamletTestKit → BaseAkkaStreamletTestKit
-
def
withConfigParameterValues(configParameterValues: ConfigParameterValue*): AkkaStreamletTestKit
Adds configuration parameters and their values to the configuration used in the test.
Adds configuration parameters and their values to the configuration used in the test.
ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.
- Definition Classes
- BaseAkkaStreamletTestKit
- Annotations
- @varargs()
-
def
withVolumeMounts(volumeMount: VolumeMount, volumeMounts: VolumeMount*): AkkaStreamletTestKit
Returns an instance of this TestKit with the specified VolumeMount available to the streamlets
Returns an instance of this TestKit with the specified VolumeMount available to the streamlets
- Definition Classes
- AkkaStreamletTestKit → BaseAkkaStreamletTestKit