Clustering Akka Streamlet

Akka Cluster is a feature of Akka that allows Akka nodes to form a cluster and communicate. This feature can be used within Cloudflow to build stateful Akka Streamlets.

Akka Cluster

Use Case

Akka Clustering in a Cloudflow Akka Streamlet gives you the ability to leverage existing Akka tools for managing data in-memory. Akka Cluster Sharding can be used where data must be consistent and Akka Distributed Data can be used for eventually consistent highly-available data. There exist many real world use cases where stateful streams are needed, including model updates in real-time ML serving and sharding IOT Device State.

Add the Clustering trait to your streamlet as shown below to activate Clustering on a specific Akka Streamlet.

object ConnectedCarCluster extends AkkaStreamlet with Clustering

By including the Clustering trait on your Akka Streamlet, Cloudflow will automatically setup all the configuration your streamlet needs to form a cluster, both when running the application locally and when the application is deployed to a Kubernetes cluster.

Kafka External Sharding Source

When using Akka Cluster Sharding with a normal Cloudflow source it is likely that the location of a specific Akka shard is not the same as the node assigned to that shard’s corresponding Kafka partition. In this case every message read from kafka would need to be sent over the network to the Shard node and back.

To avoid this case and coordinate the location of Akka shards and Kafka partitions you can use shardedSourceWithCommittable and shardedPlainSource. These Sources use an Akka feature called a Kafka ExternalShardAllocationStrategy.

To use these Sources you must first implement Clustering on your Akka Streamlet and then follow the examples below.

Like its non-sharded counterpart, shardedSourceWithCommittableContext will include Kafka offset information as context as well as implementing the Kafka Sharding strategy.

val entity = Entity(typeKey)(createBehavior = entityContext => ConnectedCarActor(entityContext.entityId))

val source:SourceWithContext[ConnectedCarERecord, CommittableOffset, _] = shardedSourceWithCommittableContext(in, entity)

shardedPlainSource does not include any Kafka offset information as context but implements the Kafka Sharding Strategy in the same way.

val entity = Entity(typeKey)(createBehavior = entityContext => ConnectedCarActor(entityContext.entityId))

val source:Source[ConnectedCarERecord, _] = shardedPlainSource(in, entity)