Building Akka Streamlets
The following sections describe how you can create an Akka streamlet.
Akka streamlets are Streamlet
s that offer Akka Streams as the user-facing API.
As mentioned in the Using Akka Streamlets, an Akka streamlet is defined by the following features:
-
It has a shape - we call it
StreamletShape
- that declares the inlets and outlets of the streamlet. Any Akka streamlet needs to define a concrete shape by using the APIs available for theStreamletShape
class. -
It has a
StreamletLogic
that defines the business logic of the Akka streamlet.
In this tutorial, we’ll build a simple Akka streamlet that accepts data records in an inlet and writes them to the console.
It can be useful to gain insights in the data that arrives at its inlet.
Let’s call the streamlet ReportPrinter
.
Extending from AkkaStreamlet
Let’s start with building the ReportPrinter
streamlet.
The first thing to do is to extend the cloudflow.akkastream.AkkaStreamlet
abstract class, as shown below:
- Scala
-
package com.example import cloudflow.akkastream._ import cloudflow.streamlets.StreamletShape //TODO rename to ReportPrinter object ReportPrinterStep0 extends AkkaStreamlet { // 1. TODO Create inlets and outlets // 2. TODO Define the shape of the streamlet override val shape: StreamletShape = StreamletShape.empty // 3. TODO Override createLogic to provide StreamletLogic override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () } }
- Java
-
package com.example; import akka.NotUsed; import akka.stream.*; import akka.stream.javadsl.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; public class ReportPrinter extends AkkaStreamlet { // 1. TODO Create inlets and outlets // 2. TODO Define the shape of the streamlet public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); } // 3. TODO Override createLogic to provide StreamletLogic public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); } }
The code snippet above shows an object ReportPrinter
that extends AkkaStreamlet
.
We have annotated with a TODO
the steps needed to complete the implementation, which we are going to do in the next few sections.
The next step is to implement inlets and outlets of the streamlet.
Inlets and outlets
The streamlet that we are building in this tutorial has one inlet and no outlet.
- Scala
-
package com.example import cloudflow.streamlets.avro._ import cloudflow.streamlets.StreamletShape import cloudflow.akkastream._ import cloudflow.streamlets.Inlet //TODO rename to ReportPrinter object ReportPrinterStep1 extends AkkaStreamlet { // 1. Create inlets and outlets val inlet: Inlet = AvroInlet[Report]("report-in") // 2. TODO Define the shape of the streamlet override val shape: StreamletShape = StreamletShape.empty // 3. TODO Override createLogic to provide StreamletLogic override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () } }
- Java
-
package com.example; import akka.NotUsed; import akka.stream.*; import akka.stream.javadsl.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; public class ReportPrinter extends AkkaStreamlet { // 1. Create inlets and outlets AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class); // 2. TODO Define the shape of the streamlet public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); } // 3. TODO Override createLogic to provide StreamletLogic public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); } }
Cloudflow supports Avro encoded processing of data.
We make this explicit by defining the inlet as AvroInlet
.
Report
is the type of the data objects that is accepted by this inlet.
This means that an inlet defined by AvroInlet[Report]
will only accept Avro encoded data that follows the schema defined for the class Report
.
The class Report
is generated by Cloudflow during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier.
As an example we have the following Avro schema for the Report
class.
It contains a summary of the attributes of products from an inventory:
{
"namespace": "com.example",
"type": "record",
"name": "Report",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "keywords",
"type": {
"type": "array",
"items": "string"
}
}
]
}
In the definition of the inlet, "report-in" is the name of the inlet. As a best practice, we recommend that you use a domain-specific name for the inlet to indicate the nature of data that this inlet is supposed to accept.
This streamlet does not have any outlet. But in general outlets are defined similarly. |
Streamlet shape
Lets now define the shape of ReportPrinter
by using the APIs in Cloudflow.streamlets.StreamletShape
:
- Scala
-
package com.example import cloudflow.streamlets.avro._ import cloudflow.streamlets.StreamletShape import cloudflow.akkastream._ import cloudflow.streamlets.Inlet //TODO rename to ReportPrinter object ReportPrinterStep2 extends AkkaStreamlet { // 1. Create inlets and outlets val inlet: Inlet = AvroInlet[Report]("report-in") // 2. Define the shape of the streamlet override val shape: StreamletShape = StreamletShape.withInlets(inlet) // 3. TODO Override createLogic to provide StreamletLogic override def createLogic: AkkaStreamletLogic = new AkkaStreamletLogic() { override def run: Unit = () } }
- Java
-
package com.example; import akka.NotUsed; import akka.stream.*; import akka.stream.javadsl.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; public class ReportPrinter extends AkkaStreamlet { // 1. Create inlets and outlets AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class); // 2. Define the shape of the streamlet public StreamletShape shape() { return StreamletShape.createWithInlets(inlet); } // 3. TODO Override createLogic to provide StreamletLogic public RunnableGraphStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); } }
The above code specifies that this streamlet has a "one inlet, no outlet" shape by overriding the shape
method with this specific configuration.
In general, StreamletShape
offers methods to define arbitrary shapes for any streamlet.
For example, to define a streamlet with 2 inlets and 2 outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid)
.
So far, we have defined the inlets and outlets and used them to declare the particular shape of this streamlet.
The next, and arguably the most important step, is to define the StreamletLogic
that contains our business logic.
Defining the StreamletLogic
The StreamletLogic
class makes it possible for a user to specify domain logic.
It is defined as an abstract class in cloudflow.akkastream.StreamletLogic
and provides an abstract method run()
where the user can define the desired business logic for the Akka Streamlet.
If the streamlet needs to define local state required for processing logic, for example, a |
For this next step, we need to override createLogic
from AkkaStreamlet
in our ReportPrinter
object.
createLogic
needs to return an instance of StreamletLogic
which will do the processing of the incoming report
s based on the requirements of ReportPrinter
object.
- Scala
-
package com.example import akka.stream.scaladsl.Sink import cloudflow.streamlets._ import cloudflow.streamlets.avro._ import cloudflow.akkastream._ import cloudflow.akkastream.scaladsl._ object ReportPrinter extends AkkaStreamlet { // 1. Create inlets and outlets val inlet = AvroInlet[Report]("report-in") // 2. Define the shape of the streamlet val shape = StreamletShape.withInlets(inlet) // 3. Override createLogic to provide StreamletLogic def createLogic = new RunnableGraphStreamletLogic() { def format(report: Report) = s"${report.name}\n]n${report.description}" def runnableGraph = plainSource(inlet) .to(Sink.foreach(report ⇒ println(format(report)))) } }
- Java
-
package com.example; import akka.NotUsed; import akka.stream.*; import akka.stream.javadsl.*; import cloudflow.streamlets.*; import cloudflow.streamlets.avro.*; import cloudflow.akkastream.*; import cloudflow.akkastream.javadsl.*; public class ReportPrinter extends AkkaStreamlet { // 1. Create inlets and outlets AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class); // 2. Define the shape of the streamlet public StreamletShape shape() { return StreamletShape.createWithInlets(inlet); } // 3. Override createLogic to provide StreamletLogic public RunnableGraphStreamletLogic createLogic() { return new RunnableGraphStreamletLogic(getContext()) { public String format(Report report) { return report.getName() + "\n\n" +report.getDescription(); } public RunnableGraph<NotUsed> createRunnableGraph() { return getPlainSource(inlet).to(Sink.foreach(report -> System.out.println(format(report)))); } }; } }
In the above code, we override createLogic
to supply the domain logic for the streamlet.
In this case, since we are implementing a printer streamlet for the console, all we need to do is read from the inlet that we defined earlier, val inlet = AvroInlet[Report]("report-in")
, and do some processing on it to transform the record into a printable string.
We provide RunnableGraphStreamletLogic
to facilitate the creation of the StreamletLogic
when you only want to define a RunnableGraph
to define the operation of this streamlet.
It only requires you to define a runnableGraph
method that specifies the graph to be run, as we have done in the above code.
The runnableGraph
method specifies that we create a Source
from the inlet
to read the reports and connect it to a Sink
that prints out the reports.
Here are the steps that we take as part of the processing logic:
-
Since we want to pretty print to the console, we define a helper
format
method to transform the inputReport
records into a formatted String. -
Every report read from the Source is printed by using
Sink.foreach
, which is part of theakka.stream.scaladsl
package.
Note that the processing logic can be quite complex and we can maintain local ephimeral state as part of the implementation of StreamletLogic
.
In summary, here are the steps for defining an Akka streamlet:
-
Define the inlets and outlets
-
Define the concrete shape using the inlets and outlets.
-
Define the custom processing logic that reads data from inlets and writes data to outlets
Using ReportPrinter
in the blueprint
An example of a blueprint using the ReportPrinter
could look like this:
blueprint {
streamlets {
ingress = com.example.ReportIngress
report-printer = com.example.ReportPrinter
}
topics {
reports {
producers = [ingress.out]
consumers = [report-printer.report-in]
}
}
}
The omitted ReportIngress
could, for instance, be another AkkaStreamlet
that writes Report
s to its outlet.
At-least-once or At-most-once processing
You can access the inlet and outlet streams through methods on the StreamletLogic
that return Akka Streams Source
s and Sink
s.
So far, we’ve used the plainSource
method to read from the inlet.
When the streamlet is started, it will only receive elements that arrive after it is started.
The same applies when the streamlet is restarted for any reason.
In short, the plainSource
provides at-most-once message delivery semantics, as described in Message delivery semantics.
The |
StreamletLogic
also provides a source type that can pass along context, see Scaladocs SourceWithContext / Javadocs SourceWithContext. It is used to pass along offsets to the end of the stream, where they can be marked as read, after the associated messages have been processed.
The sourceWithOffsetContext
method creates a SourceWithContext
, where every record is automatically associated with a CommittableOffset
.
Anything that can be marked as read in the stream to prevent re-processing at restart, extends Committable
. CommittableOffset
and CommittableOffsetBatch
are examples of Committable
objects.
Marking the messages as read is done by writing the Committable
to a committableSink
. We’ll show the details later.
Many readers will likely think of commits as ensuring database writes to occur in a single transaction. It’s important to note that in the context of Akka streamlets, committing messages means something else entirely. It does not refer to ACID transactions, and does not refer to writing or producing records to the pub/sub system. Rather, in Akka streamlets, committing means that the streamlet instance indicates that it has read up to a particular message and will not want to process the message again on restart. |
Since you likely want to acknowledge that you have read messages after you have done processing, the most natural way to do this in Akka streams is to have a Sink
eventually process the Committable
s. This does mean that you have to pass along the `Committable`s through your stream.
The following types move the committable offset along with the data but allow you to apply normal operators to the stream that treat the data transparently, as if it were without context:
SourceWithContext
-
is used to pass through the committable offsets, along with the records you want to read, further downstream.
FlowWithContext
-
is used to pass through the committable offsets while transforming and processing the messages.
A Sink
is provided by a committableSink
method that makes sure that offsets are committed automatically while the stream is processed.
We explain these options in detail in the next sections.
Sources for inlets
As described, the plainSource
methods returns an Akka Streams Source
, which always starts reading elements as they arrive.
The sourceWithOffsetContext
method returns a akka.stream.scaladsl.SourceWithContext
for an inlet, the Java equivalent is getSourceWithOffsetContext
, which returns a akka.stream.javadsl.SourceWithContext
.
A FlowWithContext
, which contains a subset of Flow
operators, automatically propagates the context for every record, so that you don’t have to worry about it when using it to process the stream. It can be easily attached to a SourceWithContext
using the via
method, similar to how a Source
and Flow
are connected.
In the Scala API, we provide type aliases |
Sinks for outlets
The plainSink
method (Java API equivalent is getPlainSink
) returns a sink to write to an outlet.
The committableSink
method (Java API equivalent is getCommittableSink
) returns a sink to write records to an outlet. There is also a version of the method that just commits the Committable
s.
During stream processing with Akka stream operators, using the SourceWithContext
and FlowWithContext
types and others, it is possible to pass along the Committable
s of any records that were originally read from inlets. At the end of the stream, these `Committable
s` can accompany any records in your process that you want to write to outlets.
The committableSink
method commits any Committable
s that you have passed along in your stream, which could refer to any messages that you have read from any inlet on the streamlet.
The committableSink
will process the `Committable`s associated with the records in batches, acknowledging that associated records read from inlets are processed and should not be resent on restart. At-least-once message delivery semantics apply here, so you have to be prepared for receiving duplicates, for instance when the streamlet crashes before a batch could be committed.
The committableSink
takes an optional CommitterSettings
argument which controls how commits are batched.
By default, the settings in akka.kafka.committer
are used, which you can tune in your application.conf
.
You can connect a SourceWithContext
or a FlowWithContext
to this type of sink with a to
method, similar to how you would connect a Source
or Flow
to a Sink
.
There is also a committableSink
method that takes no arguments.
It only commits the offsets associated with the records, the records themselves are ignored.
FlowWithContext
The FlowWithContext
provides a constrained set of operators compared to the Akka Streams Flow
.
These subset of operators process records in-order.
It is used for your convenience.
As a user you don’t have to worry about how the Committable
per record is passed along.
Operators that turn T
into Seq[T]
The FlowWithContext
propagates the Committable
per record that it operates on.
In the case of operators that create a Seq[T]
of records for every record, like grouped
, the sequencing operation is also applied to the context.
This means that the context is no longer Committable
, but instead is turned into a Seq[Committable]
.
If you transform the Seq[T]
to some type that is written to an outlet, you have to
map the committables with mapContext
and create a CommittableOffsetBatch
, since now there is a Seq[Committable]
where a single Committable
is required. (CommittableOffsetBatch
extends Committable
)
In this case you would use flowWithContext.mapContext(committables ⇒ CommittableOffsetBatch(committables))
to create a batch of offsets associated with the grouped input records. This will ensure that all the related offsets are committed.
Converting between FlowWithContext
and Flow
The Akka Streams Flow
supports more operations than the FlowWithContext
and allows for integrating with any kind of Graph
, including custom GraphStage
s.
The FlowWithContext[In, Committable, Out, Committable, Mat]
can be converted to a Flow[(In, Committable), (Out, Committable), Mat]
with asFlow
.
As you can see from the type signature, every element in the resulting Flow
is a Tuple2
of an element and its Committable
. (In Java an akka.japi.Pair
type is used instead of the Scala Tuple2
type).
The Flow[(In, Committable), (Out, Committable), Mat]
can be converted (back) to a FlowWithContext[In, Committable, Out, Committable, Mat]
with asFlowWithContext
.
Being able to convert back and forth between FlowWithContext
and Flow
means that you can stop automatic context propagation, apply more advanced operations on the stream, and once you are finished, convert the Flow
back into a FlowWithContext
, as long as you pass along the elements with their contexts as tuples.
If the order of elements is changed in custom operations on the Flow, it is likely that offsets will be advanced too early, which can result in data loss on Streamlet restart. |
The same is true for the SourceWithContext[CommittableOffset, Out, Mat]
, which can be turned into a Source[(Out, CommittableOffset), Mat]
with asSource
. A Source
can be turned into a SourceWithContext
with asSourceWithContext
.
At-least-once ReportPrinter
The plainSource
does not provide any tracking of where the streamlet left off, so if the streamlet is restarted it will print all elements from the earliest available data in the outlet it is connected to.
In this section, we’re going to use the sourceWithOffsetContext
to track offsets and commit them automatically with an committableSink
.
The sourceWithOffsetContext
method provides a SourceWithOffsetContext[In]
for an inlet.
Internally, a CommittableOffset
is paired with every element to keep track of the associated offset.
The SourceWithContext
has many of the operators of a Source
, it just automatically propagates the offsets.
In the code below, it is connected to a committableSink
, which automatically commits the offset associated with every record.
The code below shows how the ReportPrinter
can be changed to use at-least-once semantics. All we need to do is change the streamlet logic:
- Scala
-
def createLogic = new RunnableGraphStreamletLogic() { def format(report: Report) = s"${report.name}\n\n${report.description}" def runnableGraph = sourceWithCommittableContext(inlet) .map { report ⇒ println(format(report)) report } .to(committableSink) }
- Java
-
public RunnableGraph<NotUsed> createRunnableGraph() { return getSourceWithCommittableContext(inlet) .map(report -> { System.out.println(format(report)); return report; }) .toMat(getCommittableSink(), Keep.right()); }
This completes the code for the ReportPrinter
. The next step is to use it in a blueprint, as described in Composing applications using blueprints. Then, you will want to test your streamlets.