Get monitoring, observability, online education,
and expert support from Lightbend.
Learn More

Akka streamlet utilities

The cloudflow.akka.util library contains some predefined StreamletLogics:

  • HttpServerLogic

  • Splitter

  • Merger

The following sections describe how you implement stream processing logic with these utilities.

HttpServerLogic

Use case

An HttpServerLogic can be used to handle HTTP requests and store the received data into an outlet. You need to extend your streamlet from AkkaServerStreamlet so that Cloudflow will expose an HTTP endpoint in Kubernetes. The HttpServerLogic typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet. Http requests that are attempted while the HttpServerLogic is not running will result in a 503 response.

Examples

The HttpServerLogic defines an abstract method for the user to supply the processing logic, an HTTP route:

def route(sinkRef: WritableSinkRef[Out]): Route

The HttpServerLogic object has default implementations of the HttpServerLogic, which support some pre-baked routes:

  • the default method creates a HttpServerLogic that handles PUT and POST requests, where the data is read from the entity body.

  • the defaultStreaming method creates a HttpServerLogic that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.

Handle PUT / POST requests by default

The code snippet below shows an example of an HttpServerLogic using the defaultLogic:

Scala

In Scala:

  • The HttpServerLogic requires an implicit akka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out] to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T] is an alias for Unmarshaller[ByteString, T])

  • An HttpServerLogic can only be used in combination with an AkkaServerStreamlet. The HttpServerLogic.default method requires a Server argument (this also applies to the HttpServerLogic.defaultStreaming method and the HttpServerLogic constructor, you cannot construct it without it). The AkkaServerStreamlet implements Server, so you can just pass this to it from inside the streamlet, as you can see in the snippet below.

  • In the example below, the JsonSupport object defines implicit spray-json JSON formats for the SensorData type.

    import cloudflow.akkastream._
    import cloudflow.akkastream.util.scaladsl._
    
    import cloudflow.streamlets.avro._
    import cloudflow.streamlets._
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import JsonSupport._
    
    class DataHttpIngress extends AkkaServerStreamlet {
      val out         = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
      def shape       = StreamletShape.withOutlets(out)
      def createLogic = HttpServerLogic.default(this, out)
    }
Java

In Java, the akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T> is an argument to the constructor. A Jackson Unmarshaller<ByteString, Out> is created for the SensorData class, using the Jackson.byteStringUnmarshaller method, as shown below

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;

import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

import akka.http.javadsl.marshallers.jackson.Jackson;

public class DataHttpIngress extends AkkaServerStreamlet {
  AvroOutlet<Data> out =
      AvroOutlet.<Data>create("out", Data.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    return HttpServerLogic.createDefault(
        this, out, Jackson.byteStringUnmarshaller(Data.class), getContext());
  }
}

Handle streamed HTTP entities

The defaultStreaming requires an implicit FromByteStringUnmarshaller[Out] and an EntityStreamingSupport.

Scala

The Scala example below shows:

  • How to use the defaultStreaming method to unmarshal a JSON stream and write the unmarshalled data to the outlet. It provides a EntityStreamingSupport.json() to read the JSON stream. The defaultStreamingLogic uses this to read JSON from the request entity.

  • The SensorDataJsonSupport in the example provides implicit spray-json JSON Formats to unmarshal the JSON elements in the stream.

  • The akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport object implicitly provides a FromByteStringUnmarshaller based on the formats in the SensorDataJsonSupport object.

    import cloudflow.akkastream._
    import cloudflow.akkastream.util.scaladsl._
    
    import cloudflow.streamlets.avro._
    import cloudflow.streamlets._
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import akka.http.scaladsl.common.EntityStreamingSupport
    import JsonSupport._
    
    class DataStreamingIngress extends AkkaServerStreamlet {
      val out   = AvroOutlet[Data]("out", RoundRobinPartitioner)
      def shape = StreamletShape.withOutlets(out)
    
      implicit val entityStreamingSupport = EntityStreamingSupport.json()
      override def createLogic            = HttpServerLogic.defaultStreaming(this, out)
    }
Java

The Java example below shows how to use the defaultStreaming in a similar manner:

import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;
import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

public class DataStreamingIngress extends AkkaServerStreamlet {

  private AvroOutlet<Data> out =
      AvroOutlet.create("out", Data.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    EntityStreamingSupport ess = EntityStreamingSupport.json();
    return HttpServerLogic.createDefaultStreaming(
        this, out, Jackson.byteStringUnmarshaller(Data.class), ess, getContext());
  }
}

Define a custom Route

If you want to provide your own akka-http Route, override the route method. The route method provides a sinkRef argument which you can use to write to the outlet.

class DataHttpIngressCustomRoute extends AkkaServerStreamlet {
  val out   = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)
  def createLogic = new HttpServerLogic(this) {
    val writer = sinkRef(out)
    override def route(): Route =
      put {
        entity(as[Data]) { data ⇒
          onSuccess(writer.write(data)) { _ ⇒
            complete(StatusCodes.OK)
          }
        }
      }
  }
}

The above Scala example creates a route that will handle put requests where the entity contains Data, which is written to the outlet using the WritableSinkRef.

Splitter

Use case

A Splitter can be used to split a stream in two, writing elements to one of two outlets. Every element from the outlet will be processed through a FlowWithCommittableContext, which provides at-least-once semantics.

Example

The Splitter defines a sink method for the user to supply a FlowWithCommittableContext[I, Either[L, R]] and a left and right outlet. The Java version of Splitter uses an Either type that is bundled with cloudflow as cloudflow.akka.javadsl.util.Either.

The examples below shows a Splitter that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid and invalid outlets:

Scala
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class DataSplitter extends AkkaStreamlet {
  val in      = AvroInlet[Data]("in")
  val invalid = AvroOutlet[DataInvalid]("invalid").withPartitioner(data ⇒ data.key)
  val valid   = AvroOutlet[Data]("valid").withPartitioner(RoundRobinPartitioner)
  val shape   = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = sourceWithCommittableContext(in).to(Splitter.sink(flow, invalid, valid))
    def flow =
      FlowWithCommittableContext[Data]
        .map { data ⇒
          if (data.value < 0) Left(DataInvalid(data.key, data.value, "All data must be positive numbers!"))
          else Right(data)
        }
  }
}
Java
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.javadsl.util.Either;
import cloudflow.akkastream.util.javadsl.*;

public class DataSplitter extends AkkaStreamlet {
  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<DataInvalid> invalidOutlet =
      AvroOutlet.<DataInvalid>create(
          "invalid", d -> d.getKey(), DataInvalid.class);
  AvroOutlet<Data> validOutlet =
      AvroOutlet.<Data>create(
          "valid", d -> RoundRobinPartitioner.apply(d), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet);
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph createRunnableGraph() {
        return getSourceWithCommittableContext(inlet).to(Splitter.sink(createFlow(), invalidOutlet, validOutlet, getContext()));
      }
    };
  }

  public FlowWithContext<Data, Committable, Either<DataInvalid, Data>, Committable, NotUsed> createFlow() {
    return FlowWithContext.<Data, Committable>create()
      .map(data -> {
        if (data.getValue() < 0) return Either.left(new DataInvalid(data.getKey(), data.getValue(), "All data must be positive numbers!"));
        else return Either.right(data);
      });
  }
}

Merger

Use case

A Merger can be used to merge two or more inlets into one outlet.

Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.

Example

The examples below shows how to use a Merger.source to combine elements from two inlets of the type Metric into one outlet of the same type.

Scala
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets.avro._
import cloudflow.streamlets._

class DataMerge extends AkkaStreamlet {

  val in0 = AvroInlet[Data]("in-0")
  val in1 = AvroInlet[Data]("in-1")
  val out = AvroOutlet[Data]("out", d ⇒ d.key)

  final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

  override final def createLogic = new RunnableGraphStreamletLogic {
    def runnableGraph =
      Merger.source(in0, in1).to(committableSink(out))
  }
}
Java
import cloudflow.akkastream.*;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.util.javadsl.*;
import akka.stream.javadsl.*;

import java.util.*;

public class DataMerge extends AkkaStreamlet {
  AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class);
  AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out",  d -> d.getKey(), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet);
  }

  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph<?> createRunnableGraph() {
        return Merger.source(getContext(), inlet1, inlet2).to(getCommittableSink(outlet));
      }
    };
  }
}