Akka streamlet utilities

The cloudflow.akka.util library contains some predefined StreamletLogics:

  • HttpServerLogic

  • Splitter

  • Merger

The following sections describe how you implement stream processing logic with these utilities.

HttpServerLogic

Use case

An HttpServerLogic can be used to handle HTTP requests and store the received data into an outlet. You need to extend your streamlet from AkkaServerStreamlet so that Cloudflow will expose an HTTP endpoint in Kubernetes. The HttpServerLogic typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet. Http requests that are attempted while the HttpServerLogic is not running will result in a 503 response.

Examples

The HttpServerLogic defines an abstract method for the user to supply the processing logic, an HTTP route:

def route(sinkRef: WritableSinkRef[Out]): Route

The HttpServerLogic object has default implementations of the HttpServerLogic, which support some pre-baked routes:

  • the default method creates a HttpServerLogic that handles PUT and POST requests, where the data is read from the entity body.

  • the defaultStreaming method creates a HttpServerLogic that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.

Handle PUT / POST requests by default

The code snippet below shows an example of an HttpServerLogic using the defaultLogic:

Scala

In Scala:

  • The HttpServerLogic requires an implicit akka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out] to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T] is an alias for Unmarshaller[ByteString, T])

  • An HttpServerLogic can only be used in combination with an AkkaServerStreamlet. The HttpServerLogic.default method requires a Server argument (this also applies to the HttpServerLogic.defaultStreaming method and the HttpServerLogic constructor, you cannot construct it without it). The AkkaServerStreamlet implements Server, so you can just pass this to it from inside the streamlet, as you can see in the snippet below.

  • In the example below, the SensorDataJsonSupport object defines implicit spray-json JSON formats for the SensorData type.

    /*
     * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package sensordata
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import cloudflow.akkastream._
    import cloudflow.akkastream.util.scaladsl._
    
    import cloudflow.streamlets._
    import cloudflow.streamlets.avro._
    import SensorDataJsonSupport._
    
    class SensorDataHttpIngress extends AkkaServerStreamlet {
      val out                  = AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
      def shape                = StreamletShape.withOutlets(out)
      override def createLogic = HttpServerLogic.default(this, out)
    }
Java

In Java, the akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T> is an argument to the constructor. A Jackson Unmarshaller<ByteString, Out> is created for the SensorData class, using the Jackson.byteStringUnmarshaller method, as shown below

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata;

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;

import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

import akka.http.javadsl.marshallers.jackson.Jackson;

public class SensorDataIngress extends AkkaServerStreamlet {
  AvroOutlet<SensorData> out =
      AvroOutlet.<SensorData>create("out", SensorData.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    return HttpServerLogic.createDefault(
        this, out, Jackson.byteStringUnmarshaller(SensorData.class), getContext());
  }
}

Handle streamed HTTP entities

The defaultStreaming requires an implicit FromByteStringUnmarshaller[Out] and an EntityStreamingSupport.

Scala

The Scala example below shows:

  • How to use the defaultStreaming method to unmarshal a JSON stream and write the unmarshalled data to the outlet. It provides a EntityStreamingSupport.json() to read the JSON stream. The defaultStreamingLogic uses this to read JSON from the request entity.

  • The SensorDataJsonSupport in the example provides implicit spray-json JSON Formats to unmarshal the JSON elements in the stream.

  • The akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport object implicitly provides a FromByteStringUnmarshaller based on the formats in the SensorDataJsonSupport object.

    /*
     * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package sensordata
    
    import akka.http.scaladsl.common.EntityStreamingSupport
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    
    import SensorDataJsonSupport._
    import cloudflow.akkastream.AkkaServerStreamlet
    import cloudflow.akkastream.util.scaladsl._
    import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
    import cloudflow.streamlets.avro._
    
    class SensorDataStreamingIngress extends AkkaServerStreamlet {
      val out   = AvroOutlet[SensorData]("out", RoundRobinPartitioner)
      def shape = StreamletShape.withOutlets(out)
    
      implicit val entityStreamingSupport = EntityStreamingSupport.json()
      override def createLogic            = HttpServerLogic.defaultStreaming(this, out)
    }
Java

The Java example below shows how to use the defaultStreaming in a similar manner:

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata;

import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;

import cloudflow.akkastream.AkkaServerStreamlet;

import cloudflow.akkastream.AkkaStreamletLogic;
import cloudflow.akkastream.util.javadsl.HttpServerLogic;
import cloudflow.streamlets.RoundRobinPartitioner;
import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.AvroOutlet;

public class SensorDataStreamingIngress extends AkkaServerStreamlet {

  private AvroOutlet<SensorData> out =
      AvroOutlet.create("out", SensorData.class)
          .withPartitioner(RoundRobinPartitioner.getInstance());

  public StreamletShape shape() {
    return StreamletShape.createWithOutlets(out);
  }

  public AkkaStreamletLogic createLogic() {
    EntityStreamingSupport ess = EntityStreamingSupport.json();
    return HttpServerLogic.createDefaultStreaming(
        this, out, Jackson.byteStringUnmarshaller(SensorData.class), ess, getContext());
  }
}

Define a custom Route

If you want to provide your own akka-http Route, override the route method. The route method provides a sinkRef argument which you can use to write to the outlet.

override def createLogic = new HttpServerLogic(this, outlet) {
  def route(sinkRef: WritableSinkRef[Out]): Route = {
    put {
      entity(as[Data]) { data ⇒
        onSuccess(sinkRef.write(data)) { _ ⇒
          complete(StatusCodes.OK)
        }
      }
    }
  }
}

The above Scala example creates a route that will handle put requests where the entity contains Data, which is written to the outlet using the WritableSinkRef.

Splitter

Use case

A Splitter can be used to split a stream in two, writing elements to one of two outlets. Every element from the outlet will be processed through a FlowWithCommittableContext, which provides at-least-once semantics.

Example

The Splitter defines a sink method for the user to supply a FlowWithCommittableContext[I, Either[L, R]] and a left and right outlet. The Java version of Splitter uses an Either type that is bundled with cloudflow as cloudflow.akka.javadsl.util.Either.

The examples below shows a Splitter that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid and invalid outlets:

Scala
/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class MetricsValidation extends AkkaStreamlet {
  val in      = AvroInlet[Metric]("in")
  val invalid = AvroOutlet[InvalidMetric]("invalid").withPartitioner(metric ⇒ metric.metric.deviceId.toString)
  val valid   = AvroOutlet[Metric]("valid").withPartitioner(RoundRobinPartitioner)
  val shape   = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = sourceWithOffsetContext(in).to(Splitter.sink(flow, invalid, valid))
    def flow =
      FlowWithCommittableContext[Metric]
        .map { metric ⇒
          if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
          else Right(metric)
        }
  }
}
Java
/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata;

import akka.stream.javadsl.*;

import akka.NotUsed;
import akka.actor.*;
import akka.kafka.ConsumerMessage.Committable;
import akka.stream.*;

import com.typesafe.config.Config;

import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.javadsl.util.Either;
import cloudflow.akkastream.util.javadsl.*;

public class MetricsValidation extends AkkaStreamlet {
  AvroInlet<Metric> inlet = AvroInlet.<Metric>create("in", Metric.class);
  AvroOutlet<InvalidMetric> invalidOutlet =
      AvroOutlet.<InvalidMetric>create(
          "invalid", m -> m.getMetric().toString(), InvalidMetric.class);
  AvroOutlet<Metric> validOutlet =
      AvroOutlet.<Metric>create(
          "valid", m -> m.getDeviceId().toString() + m.getTimestamp().toString(), Metric.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet);
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph createRunnableGraph() {
        return getSourceWithCommittableContext(inlet)
            .to(Splitter.sink(createFlow(), invalidOutlet, validOutlet, getContext()));
      }
    };
  }

  private FlowWithContext<Metric, Committable, Either<InvalidMetric, Metric>, Committable, NotUsed>
      createFlow() {
    return FlowWithCommittableContext.<Metric>create()
        .map(
            metric -> {
              if (!SensorDataUtils.isValidMetric(metric))
                return Either.left(
                    new InvalidMetric(metric, "All measurements must be positive numbers!"));
              else return Either.right(metric);
            });
  }
}

Merger

Use case

A Merger can be used to merge two or more inlets into one outlet.

Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.

Example

The examples below shows how to use a Merger.source to combine elements from two inlets of the type Metric into one outlet of the same type.

Scala
object MetricMerge extends AkkaStreamlet {

  val in0 = AvroInlet[Metric]("in-0")
  val in1 = AvroInlet[Metric]("in-1")
  val out = AvroOutlet[Metric]("out", m ⇒ m.deviceId.toString + m.timestamp.toString)

  final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

  override final def createLogic = RunnableGraphStreamletLogic() {
    def runnableGraph =
      Merger.source(in0, in1).to(committableSink(out))
  }
}

object MetricsMerge extends Merge[Metric](5)
Java
class TestMerger extends AkkaStreamlet {
    AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class);
    AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class);
    AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out",  d -> d.name(), Data.class);

    public StreamletShape shape() {
      return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet);
    }
    public RunnableGraphStreamletLogic createLogic() {
      return new RunnableGraphStreamletLogic(getContext()) {
        public RunnableGraph createRunnableGraph() {
          return Merger.source(getContext(), inlet1, inlet2).to(getCommittableSink(outlet));
        }
      };
    }
}