Akka streamlet utilities
The cloudflow.akka.util
library contains some predefined StreamletLogic
s:
-
HttpServerLogic
-
Splitter
-
Merger
The following sections describe how you implement stream processing logic with these utilities.
HttpServerLogic
Use case
An HttpServerLogic
can be used to handle HTTP requests and store the received data into an outlet.
You need to extend your streamlet from AkkaServerStreamlet
so that Cloudflow will expose an HTTP endpoint in Kubernetes.
The HttpServerLogic
typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet.
Http requests that are attempted while the HttpServerLogic
is not running will result in a 503 response.
Examples
The HttpServerLogic
defines an abstract method for the user to supply the processing logic, an HTTP route:
def route(sinkRef: WritableSinkRef[Out]): Route
The HttpServerLogic
object has default implementations of the HttpServerLogic
, which support some pre-baked routes:
-
the
default
method creates aHttpServerLogic
that handles PUT and POST requests, where the data is read from the entity body. -
the
defaultStreaming
method creates aHttpServerLogic
that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.
Handle PUT / POST requests by default
The code snippet below shows an example of an HttpServerLogic
using the defaultLogic
:
- Scala
-
In Scala:
-
The
HttpServerLogic
requires an implicitakka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out]
to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T]
is an alias forUnmarshaller[ByteString, T]
) -
An
HttpServerLogic
can only be used in combination with anAkkaServerStreamlet
. TheHttpServerLogic.default
method requires aServer
argument (this also applies to theHttpServerLogic.defaultStreaming
method and theHttpServerLogic
constructor, you cannot construct it without it). TheAkkaServerStreamlet
implementsServer
, so you can just passthis
to it from inside the streamlet, as you can see in the snippet below. -
In the example below, the
JsonSupport
object defines implicit spray-json JSON formats for theSensorData
type.import cloudflow.akkastream._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import JsonSupport._ class DataHttpIngress extends AkkaServerStreamlet { val out = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner) def shape = StreamletShape.withOutlets(out) def createLogic = HttpServerLogic.default(this, out) }
-
- Java
-
In Java, the
akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T>
is an argument to the constructor. A JacksonUnmarshaller<ByteString, Out>
is created for theSensorData
class, using theJackson.byteStringUnmarshaller
method, as shown belowimport cloudflow.akkastream.AkkaServerStreamlet; import cloudflow.akkastream.AkkaStreamletLogic; import cloudflow.akkastream.util.javadsl.HttpServerLogic; import cloudflow.streamlets.RoundRobinPartitioner; import cloudflow.streamlets.StreamletShape; import cloudflow.streamlets.avro.AvroOutlet; import akka.http.javadsl.marshallers.jackson.Jackson; public class DataHttpIngress extends AkkaServerStreamlet { AvroOutlet<Data> out = AvroOutlet.<Data>create("out", Data.class) .withPartitioner(RoundRobinPartitioner.getInstance()); public StreamletShape shape() { return StreamletShape.createWithOutlets(out); } public AkkaStreamletLogic createLogic() { return HttpServerLogic.createDefault( this, out, Jackson.byteStringUnmarshaller(Data.class), getContext()); } }
Handle streamed HTTP entities
The defaultStreaming
requires an implicit FromByteStringUnmarshaller[Out]
and an EntityStreamingSupport
.
- Scala
-
The Scala example below shows:
-
How to use the
defaultStreaming
method to unmarshal a JSON stream and write the unmarshalled data to the outlet. It provides aEntityStreamingSupport.json()
to read the JSON stream. ThedefaultStreamingLogic
uses this to read JSON from the request entity. -
The
SensorDataJsonSupport
in the example provides implicit spray-json JSONFormats
to unmarshal the JSON elements in the stream. -
The
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
object implicitly provides aFromByteStringUnmarshaller
based on the formats in theSensorDataJsonSupport
object.import cloudflow.akkastream._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.common.EntityStreamingSupport import JsonSupport._ class DataStreamingIngress extends AkkaServerStreamlet { val out = AvroOutlet[Data]("out", RoundRobinPartitioner) def shape = StreamletShape.withOutlets(out) implicit val entityStreamingSupport = EntityStreamingSupport.json() override def createLogic = HttpServerLogic.defaultStreaming(this, out) }
-
- Java
-
The Java example below shows how to use the
defaultStreaming
in a similar manner:import akka.http.javadsl.common.EntityStreamingSupport; import akka.http.javadsl.marshallers.jackson.Jackson; import cloudflow.akkastream.AkkaServerStreamlet; import cloudflow.akkastream.AkkaStreamletLogic; import cloudflow.akkastream.util.javadsl.HttpServerLogic; import cloudflow.streamlets.RoundRobinPartitioner; import cloudflow.streamlets.StreamletShape; import cloudflow.streamlets.avro.AvroOutlet; public class DataStreamingIngress extends AkkaServerStreamlet { private AvroOutlet<Data> out = AvroOutlet.create("out", Data.class) .withPartitioner(RoundRobinPartitioner.getInstance()); public StreamletShape shape() { return StreamletShape.createWithOutlets(out); } public AkkaStreamletLogic createLogic() { EntityStreamingSupport ess = EntityStreamingSupport.json(); return HttpServerLogic.createDefaultStreaming( this, out, Jackson.byteStringUnmarshaller(Data.class), ess, getContext()); } }
Define a custom Route
If you want to provide your own akka-http
Route, override the route
method.
The route
method provides a sinkRef
argument which you can use to write to the outlet.
class DataHttpIngressCustomRoute extends AkkaServerStreamlet {
val out = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
def shape = StreamletShape.withOutlets(out)
def createLogic = new HttpServerLogic(this) {
val writer = sinkRef(out)
override def route(): Route =
put {
entity(as[Data]) { data ⇒
onSuccess(writer.write(data)) { _ ⇒
complete(StatusCodes.OK)
}
}
}
}
}
The above Scala example creates a route that will handle put requests where the entity contains Data
, which is written to the outlet using the WritableSinkRef
.
Splitter
Use case
A Splitter
can be used to split a stream in two, writing elements to one of two outlets.
Every element from the outlet will be processed through a FlowWithCommittableContext
, which provides at-least-once semantics.
Example
The Splitter
defines a sink
method for the user to supply a FlowWithCommittableContext[I, Either[L, R]]
and a left
and right
outlet.
The Java version of Splitter
uses an Either
type that is bundled with cloudflow as cloudflow.akka.javadsl.util.Either
.
The examples below shows a Splitter
that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid
and invalid
outlets:
- Scala
-
import cloudflow.akkastream._ import cloudflow.akkastream.scaladsl._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets._ import cloudflow.streamlets.avro._ class DataSplitter extends AkkaStreamlet { val in = AvroInlet[Data]("in") val invalid = AvroOutlet[DataInvalid]("invalid").withPartitioner(data ⇒ data.key) val valid = AvroOutlet[Data]("valid").withPartitioner(RoundRobinPartitioner) val shape = StreamletShape(in).withOutlets(invalid, valid) override def createLogic = new RunnableGraphStreamletLogic() { def runnableGraph = sourceWithCommittableContext(in).to(Splitter.sink(flow, invalid, valid)) def flow = FlowWithCommittableContext[Data] .map { data ⇒ if (data.value < 0) Left(DataInvalid(data.key, data.value, "All data must be positive numbers!")) else Right(data) } } }
- Java
-
import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; import cloudflow.akkastream.javadsl.util.Either; import cloudflow.akkastream.util.javadsl.*; public class DataSplitter extends AkkaStreamlet { AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class); AvroOutlet<DataInvalid> invalidOutlet = AvroOutlet.<DataInvalid>create( "invalid", d -> d.getKey(), DataInvalid.class); AvroOutlet<Data> validOutlet = AvroOutlet.<Data>create( "valid", d -> RoundRobinPartitioner.apply(d), Data.class); public StreamletShape shape() { return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet); } public AkkaStreamletLogic createLogic() { return new RunnableGraphStreamletLogic(getContext()) { public RunnableGraph createRunnableGraph() { return getSourceWithCommittableContext(inlet).to(Splitter.sink(createFlow(), invalidOutlet, validOutlet, getContext())); } }; } public FlowWithContext<Data, Committable, Either<DataInvalid, Data>, Committable, NotUsed> createFlow() { return FlowWithContext.<Data, Committable>create() .map(data -> { if (data.getValue() < 0) return Either.left(new DataInvalid(data.getKey(), data.getValue(), "All data must be positive numbers!")); else return Either.right(data); }); } }
Merger
Use case
A Merger
can be used to merge two or more inlets into one outlet.
Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.
Example
The examples below shows how to use a Merger.source
to combine elements from two inlets of the type Metric
into one outlet of the same type.
- Scala
-
import cloudflow.akkastream._ import cloudflow.akkastream.scaladsl._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ class DataMerge extends AkkaStreamlet { val in0 = AvroInlet[Data]("in-0") val in1 = AvroInlet[Data]("in-1") val out = AvroOutlet[Data]("out", d ⇒ d.key) final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out) override final def createLogic = new RunnableGraphStreamletLogic { def runnableGraph = Merger.source(in0, in1).to(committableSink(out)) } }
- Java
-
import cloudflow.akkastream.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.javadsl.*; import cloudflow.akkastream.util.javadsl.*; import akka.stream.javadsl.*; import java.util.*; public class DataMerge extends AkkaStreamlet { AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class); AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class); AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.getKey(), Data.class); public StreamletShape shape() { return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet); } public RunnableGraphStreamletLogic createLogic() { return new RunnableGraphStreamletLogic(getContext()) { public RunnableGraph<?> createRunnableGraph() { return Merger.source(getContext(), inlet1, inlet2).to(getCommittableSink(outlet)); } }; } }