Akka Streamlet utilities
The cloudflow.akka.util
library contains some predefined StreamletLogic
s:
-
Splitter
-
Merger
The following sections describe how you implement stream processing logic with these utilities.
Splitter
Use case
A Splitter
can be used to split a stream in two, writing elements to one of two outlets.
Every element from the outlet will be processed through a FlowWithCommittableContext
, which provides at-least-once semantics.
Example
The Splitter
defines a sink
method for the user to supply a FlowWithCommittableContext[I, Either[L, R]]
and a left
and right
outlet.
The Java version of Splitter
uses an Either
type that is bundled with cloudflow as cloudflow.akka.javadsl.util.Either
.
The examples below shows a Splitter
that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid
and invalid
outlets:
- Scala
-
import cloudflow.akkastream._ import cloudflow.akkastream.scaladsl._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets._ import cloudflow.streamlets.avro._ class DataSplitter extends AkkaStreamlet { val in = AvroInlet[Data]("in") val invalid = AvroOutlet[DataInvalid]("invalid").withPartitioner(data => data.key) val valid = AvroOutlet[Data]("valid").withPartitioner(RoundRobinPartitioner) val shape = StreamletShape(in).withOutlets(invalid, valid) override def createLogic = new RunnableGraphStreamletLogic() { def runnableGraph = sourceWithCommittableContext(in).to(Splitter.sink(flow, invalid, valid)) def flow = FlowWithCommittableContext[Data] .map { data => if (data.value < 0) Left(DataInvalid(data.key, data.value, "All data must be positive numbers!")) else Right(data) } } }
- Java
-
import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; import cloudflow.akkastream.javadsl.util.Either; import cloudflow.akkastream.util.javadsl.*; public class DataSplitter extends AkkaStreamlet { AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class); AvroOutlet<DataInvalid> invalidOutlet = AvroOutlet.<DataInvalid>create( "invalid", d -> d.getKey(), DataInvalid.class); AvroOutlet<Data> validOutlet = AvroOutlet.<Data>create( "valid", d -> RoundRobinPartitioner.apply(d), Data.class); public StreamletShape shape() { return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet); } public AkkaStreamletLogic createLogic() { return new RunnableGraphStreamletLogic(getContext()) { public RunnableGraph createRunnableGraph() { return getSourceWithCommittableContext(inlet).to(Splitter.sink(createFlow(), invalidOutlet, validOutlet, getContext())); } }; } public FlowWithContext<Data, Committable, Either<DataInvalid, Data>, Committable, NotUsed> createFlow() { return FlowWithContext.<Data, Committable>create() .map(data -> { if (data.getValue() < 0) return Either.left(new DataInvalid(data.getKey(), data.getValue(), "All data must be positive numbers!")); else return Either.right(data); }); } }
Merger
Use case
A Merger
can be used to merge two or more inlets into one outlet.
Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.
Example
The examples below shows how to use a Merger.source
to combine elements from two inlets of the type Metric
into one outlet of the same type.
- Scala
-
import cloudflow.akkastream._ import cloudflow.akkastream.scaladsl._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ class DataMerge extends AkkaStreamlet { val in0 = AvroInlet[Data]("in-0") val in1 = AvroInlet[Data]("in-1") val out = AvroOutlet[Data]("out", d => d.key) final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out) override final def createLogic = new RunnableGraphStreamletLogic { def runnableGraph = Merger.source(in0, in1).to(committableSink(out)) } }
- Java
-
import cloudflow.akkastream.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.javadsl.*; import cloudflow.akkastream.util.javadsl.*; import akka.stream.javadsl.*; import java.util.*; public class DataMerge extends AkkaStreamlet { AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class); AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class); AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.getKey(), Data.class); public StreamletShape shape() { return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet); } public RunnableGraphStreamletLogic createLogic() { return new RunnableGraphStreamletLogic(getContext()) { public RunnableGraph<?> createRunnableGraph() { return Merger.source(getContext(), inlet1, inlet2).to(getCommittableSink(outlet)); } }; } }