Akka Streamlet utilities
HttpServerLogic
Use case
An HttpServerLogic
can be used to handle HTTP requests and store the received data into an outlet.
You need to extend your streamlet from AkkaServerStreamlet
so that Cloudflow will expose an HTTP endpoint in Kubernetes.
The HttpServerLogic
typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet.
Http requests that are attempted while the HttpServerLogic
is not running will result in a 503 response.
Examples
The HttpServerLogic
defines an abstract method for the user to supply the processing logic, an HTTP route:
def route(sinkRef: WritableSinkRef[Out]): Route
The HttpServerLogic
object has default implementations of the HttpServerLogic
, which support some pre-baked routes:
-
the
default
method creates aHttpServerLogic
that handles PUT and POST requests, where the data is read from the entity body. -
the
defaultStreaming
method creates aHttpServerLogic
that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.
Handle PUT / POST requests by default
The code snippet below shows an example of an HttpServerLogic
using the defaultLogic
:
- Scala
-
In Scala:
-
The
HttpServerLogic
requires an implicitakka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out]
to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T]
is an alias forUnmarshaller[ByteString, T]
) -
An
HttpServerLogic
can only be used in combination with anAkkaServerStreamlet
. TheHttpServerLogic.default
method requires aServer
argument (this also applies to theHttpServerLogic.defaultStreaming
method and theHttpServerLogic
constructor, you cannot construct it without it). TheAkkaServerStreamlet
implementsServer
, so you can just passthis
to it from inside the streamlet, as you can see in the snippet below. -
In the example below, the
JsonSupport
object defines implicit spray-json JSON formats for theSensorData
type.import cloudflow.akkastream._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import JsonSupport._ class DataHttpIngress extends AkkaServerStreamlet { val out = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner) def shape = StreamletShape.withOutlets(out) def createLogic = HttpServerLogic.default(this, out) }
-
- Java
-
In Java, the
akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T>
is an argument to the constructor. A JacksonUnmarshaller<ByteString, Out>
is created for theSensorData
class, using theJackson.byteStringUnmarshaller
method, as shown belowimport cloudflow.akkastream.AkkaServerStreamlet; import cloudflow.akkastream.AkkaStreamletLogic; import cloudflow.akkastream.util.javadsl.HttpServerLogic; import cloudflow.streamlets.RoundRobinPartitioner; import cloudflow.streamlets.StreamletShape; import cloudflow.streamlets.avro.AvroOutlet; import akka.http.javadsl.marshallers.jackson.Jackson; public class DataHttpIngress extends AkkaServerStreamlet { AvroOutlet<Data> out = AvroOutlet.<Data>create("out", Data.class) .withPartitioner(RoundRobinPartitioner.getInstance()); public StreamletShape shape() { return StreamletShape.createWithOutlets(out); } public AkkaStreamletLogic createLogic() { return HttpServerLogic.createDefault( this, out, Jackson.byteStringUnmarshaller(Data.class), getContext()); } }
Handle streamed HTTP entities
The defaultStreaming
requires an implicit FromByteStringUnmarshaller[Out]
and an EntityStreamingSupport
.
- Scala
-
The Scala example below shows:
-
How to use the
defaultStreaming
method to unmarshal a JSON stream and write the unmarshalled data to the outlet. It provides aEntityStreamingSupport.json()
to read the JSON stream. ThedefaultStreamingLogic
uses this to read JSON from the request entity. -
The
SensorDataJsonSupport
in the example provides implicit spray-json JSONFormats
to unmarshal the JSON elements in the stream. -
The
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
object implicitly provides aFromByteStringUnmarshaller
based on the formats in theSensorDataJsonSupport
object.import cloudflow.akkastream._ import cloudflow.akkastream.util.scaladsl._ import cloudflow.streamlets.avro._ import cloudflow.streamlets._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.common.EntityStreamingSupport import JsonSupport._ class DataStreamingIngress extends AkkaServerStreamlet { val out = AvroOutlet[Data]("out", RoundRobinPartitioner) def shape = StreamletShape.withOutlets(out) implicit val entityStreamingSupport = EntityStreamingSupport.json() override def createLogic = HttpServerLogic.defaultStreaming(this, out) }
-
- Java
-
The Java example below shows how to use the
defaultStreaming
in a similar manner:import akka.http.javadsl.common.EntityStreamingSupport; import akka.http.javadsl.marshallers.jackson.Jackson; import cloudflow.akkastream.AkkaServerStreamlet; import cloudflow.akkastream.AkkaStreamletLogic; import cloudflow.akkastream.util.javadsl.HttpServerLogic; import cloudflow.streamlets.RoundRobinPartitioner; import cloudflow.streamlets.StreamletShape; import cloudflow.streamlets.avro.AvroOutlet; public class DataStreamingIngress extends AkkaServerStreamlet { private AvroOutlet<Data> out = AvroOutlet.create("out", Data.class) .withPartitioner(RoundRobinPartitioner.getInstance()); public StreamletShape shape() { return StreamletShape.createWithOutlets(out); } public AkkaStreamletLogic createLogic() { EntityStreamingSupport ess = EntityStreamingSupport.json(); return HttpServerLogic.createDefaultStreaming( this, out, Jackson.byteStringUnmarshaller(Data.class), ess, getContext()); } }
Define a custom Route
If you want to provide your own akka-http
Route, override the route
method.
The route
method provides a sinkRef
argument which you can use to write to the outlet.
class DataHttpIngressCustomRoute extends AkkaServerStreamlet {
val out = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
def shape = StreamletShape.withOutlets(out)
def createLogic = new HttpServerLogic(this) {
val writer = sinkRef(out)
override def route(): Route =
put {
entity(as[Data]) { data ⇒
onSuccess(writer.write(data)) { _ ⇒
complete(StatusCodes.OK)
}
}
}
}
}
The above Scala example creates a route that will handle put requests where the entity contains Data
, which is written to the outlet using the WritableSinkRef
.