Building Akka Streamlets

The following sections describe how you can create an Akka streamlet. Akka streamlets are Streamlets that offer Akka Streams as the user-facing API.

As mentioned in the Using Akka Streamlets, an Akka streamlet is defined by the following features:

  • It has a shape - we call it StreamletShape - that declares the inlets and outlets of the streamlet. Any Akka streamlet needs to define a concrete shape by using the APIs available for the StreamletShape class.

  • It has a StreamletLogic that defines the business logic of the Akka streamlet.

In this tutorial, we’ll build a simple Akka streamlet that accepts data records in an inlet and writes them to the console. It can be useful to gain insights in the data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

Extending from AkkaStreamlet

Let’s start with building the ReportPrinter streamlet. The first thing to do is to extend the cloudflow.akkastream.AkkaStreamlet abstract class, as shown below:

Scala
package com.example

import cloudflow.akkastream._
import cloudflow.streamlets.StreamletShape

//TODO rename to ReportPrinter
object ReportPrinterStep0 extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  override val shape: StreamletShape = StreamletShape.empty
  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () }
}
Java
package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); }
  // 3. TODO Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

The code snippet above shows an object ReportPrinter that extends AkkaStreamlet. We have annotated with a TODO the steps needed to complete the implementation, which we are going to do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

Inlets and outlets

The streamlet that we are building in this tutorial has one inlet and no outlet.

Scala
package com.example

import cloudflow.streamlets.avro._
import cloudflow.streamlets.StreamletShape

import cloudflow.akkastream._
import cloudflow.streamlets.Inlet

//TODO rename to ReportPrinter
object ReportPrinterStep1 extends AkkaStreamlet {
  // 1. Create inlets and outlets
  val inlet: Inlet = AvroInlet[Report]("report-in")

  // 2. TODO Define the shape of the streamlet
  override val shape: StreamletShape = StreamletShape.empty
  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () }
}
Java
package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); }
  // 3. TODO Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

Cloudflow supports Avro encoded processing of data. We make this explicit by defining the inlet as AvroInlet. Report is the type of the data objects that is accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept Avro encoded data that follows the schema defined for the class Report. The class Report is generated by Cloudflow during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we have the following Avro schema for the Report class. It contains a summary of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. As a best practice, we recommend that you use a domain-specific name for the inlet to indicate the nature of data that this inlet is supposed to accept.

This streamlet does not have any outlet. But in general outlets are defined similarly. val out = AvroOutlet[Report]("out").withPartitioner(report ⇒ report.name) defines an outlet that writes Avro encoded data for the object of type Report. Here, "report-out" is the name that we give to the outlet. withPartitioner is used to set a partitioning function, which is used to partition the data to the underlying streaming system. If we don’t specify a partitioner, the default RoundRobinPartitioner is used.

Streamlet shape

Lets now define the shape of ReportPrinter by using the APIs in Cloudflow.streamlets.StreamletShape:

Scala
package com.example

import cloudflow.streamlets.avro._
import cloudflow.streamlets.StreamletShape

import cloudflow.akkastream._
import cloudflow.streamlets.Inlet

//TODO rename to ReportPrinter
object ReportPrinterStep2 extends AkkaStreamlet {
  // 1. Create inlets and outlets
  val inlet: Inlet = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  override val shape: StreamletShape = StreamletShape.withInlets(inlet)

  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () }
}
Java
package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. Define the shape of the streamlet
  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet);
  }
  // 3. TODO Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

The above code specifies that this streamlet has a "one inlet, no outlet" shape by overriding the shape method with this specific configuration. In general, StreamletShape offers methods to define arbitrary shapes for any streamlet. For example, to define a streamlet with 2 inlets and 2 outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

So far, we have defined the inlets and outlets and used them to declare the particular shape of this streamlet. The next, and arguably the most important step, is to define the StreamletLogic that contains our business logic.

Defining the StreamletLogic

The StreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in cloudflow.akkastream.StreamletLogic and provides an abstract method run() where the user can define the desired business logic for the Akka Streamlet.

If the streamlet needs to define local state required for processing logic, for example, a Map to resolve values or a List of the most recent events, it must be put inside the StreamletLogic class and not as part of the outer Streamlet class. The Streamlet class is used by Cloudflow for extraction of metadata. Cloudflow instantiates Streamlets when the blueprint is verified, which can have unwanted side effects.

For this next step, we need to override createLogic from AkkaStreamlet in our ReportPrinter object. createLogic needs to return an instance of StreamletLogic which will do the processing of the incoming reports based on the requirements of ReportPrinter object.

Scala
package com.example

import akka.stream.scaladsl.Sink

import cloudflow.streamlets._
import cloudflow.streamlets.avro._

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  val shape = StreamletShape.withInlets(inlet)

  // 3. Override createLogic to provide StreamletLogic
  def createLogic = new RunnableGraphStreamletLogic() {
    def format(report: Report) = s"${report.name}\n]n${report.description}"
    def runnableGraph =
      plainSource(inlet)
        .to(Sink.foreach(report ⇒ println(format(report))))
  }
}
Java
package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.*;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. Define the shape of the streamlet
  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet);
  }
  // 3. Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      public String format(Report report) {
        return report.getName() + "\n\n" +report.getDescription();
      }
      public RunnableGraph<NotUsed> createRunnableGraph() {
        return getPlainSource(inlet).to(Sink.foreach(report -> System.out.println(format(report))));
      }
    };
  }
}

In the above code, we override createLogic to supply the domain logic for the streamlet.

In this case, since we are implementing a printer streamlet for the console, all we need to do is read from the inlet that we defined earlier, val inlet = AvroInlet[Report]("report-in"), and do some processing on it to transform the record into a printable string.

We provide RunnableGraphStreamletLogic to facilitate the creation of the StreamletLogic when you only want to define a RunnableGraph to define the operation of this streamlet.

It only requires you to define a runnableGraph method that specifies the graph to be run, as we have done in the above code. The runnableGraph method specifies that we create a Source from the inlet to read the reports and connect it to a Sink that prints out the reports.

Here are the steps that we take as part of the processing logic:

  • Since we want to pretty print to the console, we define a helper format method to transform the input Report records into a formatted String.

  • Every report read from the Source is printed by using Sink.foreach, which is part of the akka.stream.scaladsl package.

Note that the processing logic can be quite complex and we can maintain local ephimeral state as part of the implementation of StreamletLogic.

In summary, here are the steps for defining an Akka streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets.

  • Define the custom processing logic that reads data from inlets and writes data to outlets

Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  topics {
    reports {
      producers = [ingress.out]
      consumers = [report-printer.report-in]
    }
  }
}

The omitted ReportIngress could, for instance, be another AkkaStreamlet that writes Reports to its outlet.

At-least-once or At-most-once processing

You can access the inlet and outlet streams through methods on the StreamletLogic that return Akka Streams Sources and Sinks.

So far, we’ve used the plainSource method to read from the inlet. When the streamlet is started, it will only receive elements that arrive after it is started. The same applies when the streamlet is restarted for any reason. In short, the plainSource provides at-most-once message delivery semantics, as described in Message delivery semantics.

The plainSource method optionally takes a ResetPosition argument. By default this is set to Latest, which means that it will only provide records as they arrive, starting at the latest offset known for every partition. If ResetPosition is set to Earliest, plainSource will start providing records from the earliest offset known for every partition, potentially re-reading all received records on every restart.

StreamletLogic also provides a source type that can pass along context, see Scaladocs SourceWithContext / Javadocs SourceWithContext. It is used to pass along offsets to the end of the stream, where they can be marked as read, after the associated messages have been processed.

The sourceWithOffsetContext method creates a SourceWithContext, where every record is automatically associated with a CommittableOffset.

Anything that can be marked as read in the stream to prevent re-processing at restart, extends Committable. CommittableOffset and CommittableOffsetBatch are examples of Committable objects.

Marking the messages as read is done by writing the Committable to a committableSink. We’ll show the details later.

Many readers will likely think of commits as ensuring database writes to occur in a single transaction. It’s important to note that in the context of Akka streamlets, committing messages means something else entirely. It does not refer to ACID transactions, and does not refer to writing or producing records to the pub/sub system. Rather, in Akka streamlets, committing means that the streamlet instance indicates that it has read up to a particular message and will not want to process the message again on restart.

Since you likely want to acknowledge that you have read messages after you have done processing, the most natural way to do this in Akka streams is to have a Sink eventually process the Committables. This does mean that you have to pass along the `Committable`s through your stream.

The following types move the committable offset along with the data but allow you to apply normal operators to the stream that treat the data transparently, as if it were without context:

SourceWithContext

is used to pass through the committable offsets, along with the records you want to read, further downstream.

FlowWithContext

is used to pass through the committable offsets while transforming and processing the messages.

A Sink is provided by a committableSink method that makes sure that offsets are committed automatically while the stream is processed. We explain these options in detail in the next sections.

Sources for inlets

As described, the plainSource methods returns an Akka Streams Source, which always starts reading elements as they arrive.

The sourceWithOffsetContext method returns a akka.stream.scaladsl.SourceWithContext for an inlet, the Java equivalent is getSourceWithOffsetContext, which returns a akka.stream.javadsl.SourceWithContext.

A FlowWithContext, which contains a subset of Flow operators, automatically propagates the context for every record, so that you don’t have to worry about it when using it to process the stream. It can be easily attached to a SourceWithContext using the via method, similar to how a Source and Flow are connected.

In the Scala API, we provide type aliases cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T] and cloudflow.akkastream.scaladsl.FlowWithCommittableContext[I, O] which are short-hand for akka.stream.scaladsl.SourceWithContext[T, CommittableOffset, NotUsed], akka.stream.scaladsl.FlowWithContext[I, Committable, O, Committable, NotUsed], respectively.

Sinks for outlets

The plainSink method (Java API equivalent is getPlainSink) returns a sink to write to an outlet.

The committableSink method (Java API equivalent is getCommittableSink) returns a sink to write records to an outlet. There is also a version of the method that just commits the Committables.

During stream processing with Akka stream operators, using the SourceWithContext and FlowWithContext types and others, it is possible to pass along the Committables of any records that were originally read from inlets. At the end of the stream, these `Committables` can accompany any records in your process that you want to write to outlets.

The committableSink method commits any Committables that you have passed along in your stream, which could refer to any messages that you have read from any inlet on the streamlet.

The committableSink will process the `Committable`s associated with the records in batches, acknowledging that associated records read from inlets are processed and should not be resent on restart. At-least-once message delivery semantics apply here, so you have to be prepared for receiving duplicates, for instance when the streamlet crashes before a batch could be committed.

The committableSink takes an optional CommitterSettings argument which controls how commits are batched. By default, the settings in akka.kafka.committer are used, which you can tune in your application.conf.

You can connect a SourceWithContext or a FlowWithContext to this type of sink with a to method, similar to how you would connect a Source or Flow to a Sink. There is also a committableSink method that takes no arguments.

It only commits the offsets associated with the records, the records themselves are ignored.

FlowWithContext

The FlowWithContext provides a constrained set of operators compared to the Akka Streams Flow. These subset of operators process records in-order. It is used for your convenience. As a user you don’t have to worry about how the Committable per record is passed along.

Operators that turn T into Seq[T]

The FlowWithContext propagates the Committable per record that it operates on. In the case of operators that create a Seq[T] of records for every record, like grouped, the sequencing operation is also applied to the context. This means that the context is no longer Committable, but instead is turned into a Seq[Committable].

If you transform the Seq[T] to some type that is written to an outlet, you have to map the committables with mapContext and create a CommittableOffsetBatch, since now there is a Seq[Committable] where a single Committable is required. (CommittableOffsetBatch extends Committable) In this case you would use flowWithContext.mapContext(committables ⇒ CommittableOffsetBatch(committables)) to create a batch of offsets associated with the grouped input records. This will ensure that all the related offsets are committed.

Converting between FlowWithContext and Flow

The Akka Streams Flow supports more operations than the FlowWithContext and allows for integrating with any kind of Graph, including custom GraphStages. The FlowWithContext[In, Committable, Out, Committable, Mat] can be converted to a Flow[(In, Committable), (Out, Committable), Mat] with asFlow. As you can see from the type signature, every element in the resulting Flow is a Tuple2 of an element and its Committable. (In Java an akka.japi.Pair type is used instead of the Scala Tuple2 type). The Flow[(In, Committable), (Out, Committable), Mat] can be converted (back) to a FlowWithContext[In, Committable, Out, Committable, Mat] with asFlowWithContext.

Being able to convert back and forth between FlowWithContext and Flow means that you can stop automatic context propagation, apply more advanced operations on the stream, and once you are finished, convert the Flow back into a FlowWithContext, as long as you pass along the elements with their contexts as tuples.

If the order of elements is changed in custom operations on the Flow, it is likely that offsets will be advanced too early, which can result in data loss on Streamlet restart.

The same is true for the SourceWithContext[CommittableOffset, Out, Mat], which can be turned into a Source[(Out, CommittableOffset), Mat] with asSource. A Source can be turned into a SourceWithContext with asSourceWithContext.

At-least-once ReportPrinter

The plainSource does not provide any tracking of where the streamlet left off, so if the streamlet is restarted it will print all elements from the earliest available data in the outlet it is connected to.

In this section, we’re going to use the sourceWithOffsetContext to track offsets and commit them automatically with an committableSink.

The sourceWithOffsetContext method provides a SourceWithOffsetContext[In] for an inlet. Internally, a CommittableOffset is paired with every element to keep track of the associated offset. The SourceWithContext has many of the operators of a Source, it just automatically propagates the offsets. In the code below, it is connected to a committableSink, which automatically commits the offset associated with every record.

The code below shows how the ReportPrinter can be changed to use at-least-once semantics. All we need to do is change the streamlet logic:

Scala
  def createLogic = new RunnableGraphStreamletLogic() {
    def format(report: Report) = s"${report.name}\n\n${report.description}"
    def runnableGraph =
      sourceWithCommittableContext(inlet)
        .map { report ⇒
          println(format(report))
          report
        }
        .to(committableSink)
  }
Java
      public RunnableGraph<NotUsed> createRunnableGraph() {
        return getSourceWithCommittableContext(inlet)
          .map(report -> {
              System.out.println(format(report));
              return report;
          })
          .toMat(getCommittableSink(), Keep.right());
      }

This completes the code for the ReportPrinter. The next step is to use it in a blueprint, as described in Composing applications using blueprints. Then, you will want to test your streamlets.