Akka Streamlet utilities

The cloudflow.akka.util library contains some predefined StreamletLogics:

The following sections describe how you implement stream processing logic with these utilities.

Splitter

Use case

A Splitter can be used to split a stream in two, writing elements to one of two outlets. Every element from the outlet will be processed through a FlowWithCommittableContext, which provides at-least-once semantics.

Example

The Splitter defines a sink method for the user to supply a FlowWithCommittableContext[I, Either[L, R]] and a left and right outlet. The Java version of Splitter uses an Either type that is bundled with cloudflow as cloudflow.akka.javadsl.util.Either.

The examples below shows a Splitter that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid and invalid outlets:

Scala
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class DataSplitter extends AkkaStreamlet {
  val in      = AvroInlet[Data]("in")
  val invalid = AvroOutlet[DataInvalid]("invalid").withPartitioner(data => data.key)
  val valid   = AvroOutlet[Data]("valid").withPartitioner(RoundRobinPartitioner)
  val shape   = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = sourceWithCommittableContext(in).to(Splitter.sink(flow, invalid, valid))
    def flow =
      FlowWithCommittableContext[Data]
        .map { data =>
          if (data.value < 0) Left(DataInvalid(data.key, data.value, "All data must be positive numbers!"))
          else Right(data)
        }
  }
}
Java
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.javadsl.util.Either;
import cloudflow.akkastream.util.javadsl.*;

public class DataSplitter extends AkkaStreamlet {
  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<DataInvalid> invalidOutlet =
      AvroOutlet.<DataInvalid>create(
          "invalid", d -> d.getKey(), DataInvalid.class);
  AvroOutlet<Data> validOutlet =
      AvroOutlet.<Data>create(
          "valid", d -> RoundRobinPartitioner.apply(d), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet);
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph createRunnableGraph() {
        return getSourceWithCommittableContext(inlet).to(Splitter.sink(createFlow(), invalidOutlet, validOutlet, getContext()));
      }
    };
  }

  public FlowWithContext<Data, Committable, Either<DataInvalid, Data>, Committable, NotUsed> createFlow() {
    return FlowWithContext.<Data, Committable>create()
      .map(data -> {
        if (data.getValue() < 0) return Either.left(new DataInvalid(data.getKey(), data.getValue(), "All data must be positive numbers!"));
        else return Either.right(data);
      });
  }
}

Merger

Use case

A Merger can be used to merge two or more inlets into one outlet.

Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.

Example

The examples below shows how to use a Merger.source to combine elements from two inlets of the type Metric into one outlet of the same type.

Scala
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets.avro._
import cloudflow.streamlets._

class DataMerge extends AkkaStreamlet {

  val in0 = AvroInlet[Data]("in-0")
  val in1 = AvroInlet[Data]("in-1")
  val out = AvroOutlet[Data]("out", d => d.key)

  final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

  override final def createLogic = new RunnableGraphStreamletLogic {
    def runnableGraph =
      Merger.source(in0, in1).to(committableSink(out))
  }
}
Java
import cloudflow.akkastream.*;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.javadsl.*;
import cloudflow.akkastream.util.javadsl.*;
import akka.stream.javadsl.*;

import java.util.*;

public class DataMerge extends AkkaStreamlet {
  AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class);
  AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out",  d -> d.getKey(), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet);
  }

  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public RunnableGraph<?> createRunnableGraph() {
        return Merger.source(getContext(), inlet1, inlet2).to(getCommittableSink(outlet));
      }
    };
  }
}