Akka Streamlet utilities

HttpServerLogic

Use case

An HttpServerLogic can be used to handle HTTP requests and store the received data into an outlet. You need to extend your streamlet from AkkaServerStreamlet so that Cloudflow will expose an HTTP endpoint in Kubernetes. The HttpServerLogic typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet. Http requests that are attempted while the HttpServerLogic is not running will result in a 503 response.

Examples

The HttpServerLogic defines an abstract method for the user to supply the processing logic, an HTTP route:

def route(sinkRef: WritableSinkRef[Out]): Route

The HttpServerLogic object has default implementations of the HttpServerLogic, which support some pre-baked routes:

  • the default method creates a HttpServerLogic that handles PUT and POST requests, where the data is read from the entity body.

  • the defaultStreaming method creates a HttpServerLogic that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.

Handle PUT / POST requests by default

The code snippet below shows an example of an HttpServerLogic using the defaultLogic:

Scala

In Scala:

  • The HttpServerLogic requires an implicit akka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out] to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T] is an alias for Unmarshaller[ByteString, T])

  • An HttpServerLogic can only be used in combination with an AkkaServerStreamlet. The HttpServerLogic.default method requires a Server argument (this also applies to the HttpServerLogic.defaultStreaming method and the HttpServerLogic constructor, you cannot construct it without it). The AkkaServerStreamlet implements Server, so you can just pass this to it from inside the streamlet, as you can see in the snippet below.

  • In the example below, the JsonSupport object defines implicit spray-json JSON formats for the SensorData type.

    import cloudflow.akkastream._
    import cloudflow.akkastream.util.scaladsl._
    
    import cloudflow.streamlets.avro._
    import cloudflow.streamlets._
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import JsonSupport._
    
    class DataHttpIngress extends AkkaServerStreamlet {
      val out         = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
      def shape       = StreamletShape.withOutlets(out)
      def createLogic = HttpServerLogic.default(this, out)
    }
Java

In Java, the akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T> is an argument to the constructor. A Jackson Unmarshaller<ByteString, Out> is created for the SensorData class, using the Jackson.byteStringUnmarshaller method, as shown below

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;

import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

import akka.http.javadsl.marshallers.jackson.Jackson;

public class DataHttpIngress extends AkkaServerStreamlet {
  AvroOutlet<Data> out =
      AvroOutlet.<Data>create("out", Data.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    return HttpServerLogic.createDefault(
        this, out, Jackson.byteStringUnmarshaller(Data.class), getContext());
  }
}

Handle streamed HTTP entities

The defaultStreaming requires an implicit FromByteStringUnmarshaller[Out] and an EntityStreamingSupport.

Scala

The Scala example below shows:

  • How to use the defaultStreaming method to unmarshal a JSON stream and write the unmarshalled data to the outlet. It provides a EntityStreamingSupport.json() to read the JSON stream. The defaultStreamingLogic uses this to read JSON from the request entity.

  • The SensorDataJsonSupport in the example provides implicit spray-json JSON Formats to unmarshal the JSON elements in the stream.

  • The akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport object implicitly provides a FromByteStringUnmarshaller based on the formats in the SensorDataJsonSupport object.

    import cloudflow.akkastream._
    import cloudflow.akkastream.util.scaladsl._
    
    import cloudflow.streamlets.avro._
    import cloudflow.streamlets._
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import akka.http.scaladsl.common.EntityStreamingSupport
    import JsonSupport._
    
    class DataStreamingIngress extends AkkaServerStreamlet {
      val out   = AvroOutlet[Data]("out", RoundRobinPartitioner)
      def shape = StreamletShape.withOutlets(out)
    
      implicit val entityStreamingSupport = EntityStreamingSupport.json()
      override def createLogic            = HttpServerLogic.defaultStreaming(this, out)
    }
Java

The Java example below shows how to use the defaultStreaming in a similar manner:

import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;
import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

public class DataStreamingIngress extends AkkaServerStreamlet {

  private AvroOutlet<Data> out =
      AvroOutlet.create("out", Data.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    EntityStreamingSupport ess = EntityStreamingSupport.json();
    return HttpServerLogic.createDefaultStreaming(
        this, out, Jackson.byteStringUnmarshaller(Data.class), ess, getContext());
  }
}

Define a custom Route

If you want to provide your own akka-http Route, override the route method. The route method provides a sinkRef argument which you can use to write to the outlet.

class DataHttpIngressCustomRoute extends AkkaServerStreamlet {
  val out   = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)
  def createLogic = new HttpServerLogic(this) {
    val writer = sinkRef(out)
    override def route(): Route =
      put {
        entity(as[Data]) { data =>
          onSuccess(writer.write(data)) { _ =>
            complete(StatusCodes.OK)
          }
        }
      }
  }
}

The above Scala example creates a route that will handle put requests where the entity contains Data, which is written to the outlet using the WritableSinkRef.