Developing Cloudflow streamlets

Please read the introduction to get an overview of Cloudflow. This section goes into more depth on streamlets and how to develop them. Akka and Flink streamlets can be developed in Scala or Java, Spark streamlets can only be developed in Scala.

Schema-first approach

Cloudflow has adopted a schema-first approach for building streaming data pipelines. You simply need to supply an Avro schema as the starting point of the domain model for which a streaming data flow needs to be built. Cloudflow plugs in the appropriate plug-ins into the build system of the application to generate Java / Scala classes corresponding to the Avro schema.

This approach has the advantage that the you only need to take care of the core domain model and Cloudflow does the heavy lifting of generating the classes and integrating them with the main application.

However since Cloudflow takes the schema as the input, it needs to ensure that the corresponding data inlets and outlets honor the schema when allowing data to flow through them. This needs to be done to ensure data consistency across all the inlets and outlets. We discuss this in the next section.

Schema code generation

You can choose what programming language to generate their schemas into by defining settings in the SBT project. When no settings are defined, by default the sbt-cloudflow plugin will look for Avro schemas in src/main/avro and will generate Scala classes.

If you wish to override the location of your Avro schemas in your project, or if you wish to generate Java classes instead, you can do so by defining a Setting in your SBT project.

  • schemaCodeGenerator (Default: SchemaCodeGenerator.Scala) - The programming language to generate Avro schemas classes into.

  • schemaPaths (Default: Map(SchemaFormat.Avro → "src/main/avro")) - The relative path to the Avro schemas.

  • schemaFormat (Default: Seq(SchemaFormat.Avro)) - The schema formats to generate. Avro is the only format currently supported.

For example, to generate Java classes from Avro schemas in a non-default location.

lazy val datamodel = (project in file("./my-cloudflow-library"))
  .enablePlugins(CloudflowLibraryPlugin)
  .settings(
    schemaCodeGenerator := SchemaCodeGenerator.Java,
    schemaPaths := Map(SchemaFormat.Avro -> "src/main/resources/avroschemas")
  )

Schema-aware inlets and outlets

In Cloudflow, all streamlet inlets and outlets are schema-aware. This means two things:

  • Every inlet and outlet must allow only data to flow through them that honors their schema.

  • Inlets and outlets can be connected together only if the schemas of an outlet and the corresponding inlet are compatible.

Let’s take a look at how Cloudflow ensures both of the above guarantees.

Data safety guarantee through types

As mentioned above, any pipeline definition starts with the Avro schema definition for the domain object. Let’s assume you provide the following Avro schema as an input. It’s a definition of a call record as used by a telephone company.

{
    "namespace": "carly.data",
    "type": "record",
    "name": "CallRecord",
    "fields":[
         {
            "name": "user",
            "type": "string"
         },
         {
             "name": "other",
             "type": "string"
         },
         {
             "name": "direction",
             "type": "string"
         },
         {
              "name": "duration",
              "type": "long"
         },
         {
            "name": "timestamp",
            "type": "long"
         }
    ]
}

In case of a Scala application, Cloudflow will generate a Scala case class cloudflow.examples.CallRecord corresponding to the above schema. This class will now be made available for use within the application.

When we define a streamlet where objects of type CallRecord will be passing through its inlet, we define the inlet as val in = AvroInlet[CallRecord]("call-record-in"). Cloudflow ensures the following compile time guarantees through this type definition:

  • The inlet only allows a codec of type Avro.

  • Cloudflow only allows the inlet to be used with an object of type CallRecord.

    For example, when implementing createLogic if you do readStream(in) where in is an inlet parameterized by a type other than CallRecord, the compiler will complain.

Outlet and the partitioning function

Similar to inlets, the user can define an outlet as:

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package carly.ingestor

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import JsonCallRecord._
import cloudflow.streamlets.avro._
import carly.data._
import cloudflow.streamlets._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl.HttpServerLogic

class CallRecordIngress extends AkkaServerStreamlet {

  //tag::docs-outlet-partitioner-example[]
  val out = AvroOutlet[CallRecord]("out").withPartitioner(RoundRobinPartitioner)
  //end::docs-outlet-partitioner-example[]

  final override val shape       = StreamletShape.withOutlets(out)
  final override def createLogic = HttpServerLogic.default(this, out)
}

Besides the name of the outlet, it may also have a partitioning function that defines how data will be partitioned when writing to Kafka topics. Data partitioning in Kafka ensures scalability. If no partitioning function is specified, it will default to the RoundRobinPartitioner.

All logic regarding data safety that we discussed for inlets in the last section applies for outlets as well.

Schema-aware StreamletLogic

When we implement StreamletLogic for a streamlet, we use the inlets and outlets which, as we discussed above, are schema aware. Note that in the following code fragment, all inlet and outlet types are parameterized with domain classes CallRecord and AggregatedCallStats that have been generated using the schema. Here’s an example:

  val in    = AvroInlet[CallRecord]("in")
  val out   = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

In the above example, we have one inlet that allows data of type CallRecord and one outlet that allows data of type AggregatedCallStats. Here the user had supplied the schema for both of the above types from which Scala classes have been generated by Cloudflow. And the entire StreamletLogic code is based on these two classes - we read CallRecord from the inlet, do processing and generate AggregatedCallStats to be sent to the outlet.

Hence the entire streamlet is guaranteed only to process data that conforms to the schema which the user had supplied.

Composing streamlets using blueprints

In Cloudflow, a streaming application is composed of Streamlets. Each Streamlet defines one or more inlets and outlets. To create a functional pipeline, we must define how the Streamlets connect together by declaring which outlet from one Streamlet should connect with an inlet of another.

A blueprint specifies which streamlets should be used in a pipeline and how they should be connected together. It must be defined in the file src/main/blueprint/blueprint.conf within the file structure of the Cloudflow application sbt project.

An example of a blueprint is shown below:

blueprint {
  streamlets {
    sensor-data = cloudflow.examples.sensordata.SensorDataIngress
    metrics = cloudflow.examples.sensordata.SensorDataToMetrics
    validation = cloudflow.examples.sensordata.MetricsValidation
    reporter = cloudflow.examples.sensordata.MetricsEgress
  }

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }
}
Blueprint files are specified in the HOCON format, a superset of JSON.

The above example shows four streamlets, sensor-data, metrics, validation, and reporter. The streamlets section declares the streamlets used in that blueprint and gives them a short name that we use to refer to them in the next section. The connections section declares how the inlets and outlets of the different participating streamlets are connected.

A blueprint file always consists of one blueprint section. The streamlets section defines which streamlets must be used in the blueprint and assigns a reference to every streamlet. The streamlet references are later on used for connecting streamlets together. Lets zoom in on the streamlets section in the example below:

  streamlets {
    sensor-data = cloudflow.examples.sensordata.SensorDataIngress
    metrics = cloudflow.examples.sensordata.SensorDataToMetrics
    validation = cloudflow.examples.sensordata.MetricsValidation
    reporter = cloudflow.examples.sensordata.MetricsEgress
  }

The above example shows four streamlet types defined in the cloudflow.examples.sensordata package. Streamlet types (classes or objects) are specified by their fully qualified names.

Every assignment in the streamlets section assigns a streamlet reference to an instance of a type of streamlet. Once deployed, Cloudflow will run at least one streamlet–depending on the requested number of replicas–for each streamlet reference that has been defined in the blueprint.

The Streamlets can be defined in Java or Scala, as Java or Scala classes with default constructors or as Scala objects. These classes must be available on the classpath, which can be defined directly in the Cloudflow application sbt project or in any dependency that the project has specified, as you would expect of any sbt project.

You can define more than one streamlet reference to the same Scala object. In that case, as many streamlets as assigned streamlet references will be run once everything is deployed (assuming a scaling factor of one). You should view a Streamlet, defined as a Scala object as a template that can be used many times to instantiate streamlets, not as a singleton object that will only run once as a streamlet.

The streamlet references assigned in the streamlets section can be used in the connections section to connect streamlets together. Lets zoom in on the streamlets section in the example below:

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }

Every expression in a connections section defines how an outlet connects to one or more inlets. Every assignment follows the following pattern:

  <streamlet-reference-a>.<outlet> = [<streamlet-reference-b>.<inlet>, <streamlet-reference-c>.<inlet>, ...]

Streamlet outlets and inlets are always prefixed by a streamlet-reference, followed by a dot ('.'). As you can see from the above pattern, it is possible to connect many inlets to the same outlet.

Every streamlet type has a shape, defining how many inlets and outlets it has. All streamlets implement a shape() method which returns the shape of the streamlet. In this case, the streamlet referenced by sensor-data has a single outlet named "out". Similarly, the streamlet referenced by "metrics" has one inlet named "in" and one outlet named "out".

As discussed earlier, inlets and outlets of streamlets are explicitly typed, e.g. they only handle data that conform to specific Avro schemas. Inlets can only be connected to outlets if their schemas are compatible. You can verify if the blueprint connects all the streamlets correctly by using:

  sbt verifyBlueprint

The blueprint is automatically verified when sbt buildAndPublish is used.

Message delivery semantics

Cloudflow follows the 'let it crash' principle and can recover from most failure scenarios, except those deemed catastrophic, where the data used for recovery (snapshots) may have been lost. This approach also follows the general policy of Kubernetes, where processes are ephemeral and can be restarted, replaced, or scaled up/down at any time.

The sections that follow mention different models for message delivery semantics provided by Spark-based and Akka-based streamlets. By message delivery semantics, we refer to the expected message delivery guaranties in the case of failure recovery. In a distributed application such as Cloudflow, failure may happen at several different execution levels: from a failed task in an individual executor, to a pod that goes down, to a complete application crash.

After a failure recovery, we recognize these different message delivery guarantees:

At most once

Data may have been processed but will never be processed twice. In this case, data may be lost but processing will never result in duplicate records.

At-least-once

Data that has been processed may be replayed and processed again. In this case, each data record is guaranteed to be processed and may result in duplicate records.

Exactly once

Data is processed once and only once. All data is guaranteed to be processed and no duplicate records are generated. This is the most desirable guarantee for many enterprise applications, but it’s considered impossible to achieve in a distributed environment.

Effectively Exactly Once

is a variant of exactly once delivery semantics that tolerates duplicates during data processing and requires the producer side of the process to be idempotent. That is, producing the same record more than once is the same as producing it only once. In practical terms, this translates to writing the data to a system that can preserve the uniqueness of keys or use a deduplication process to prevent duplicate records from being produced to an external system.

Streamlet configuration parameters

A streamlet can require dynamic configuration parameters at deployment time. Configuration parameters can be used to change the way the streamlet functions when it is run.

Examples of configuration parameters are database connection strings, URLs, credentials, or anything else that you want to specify at deployment time.

A streamlet specifies that it requires particular config parameters by expressing them in code. The values for these parameters will be requested, validated, and set when kubectl cloudflow deploy is used to deploy the Cloudflow application.

There are a number of predefined configuration parameter types:

IntegerConfigParameter

A signed 32 bit integer value.

StringConfigParameter

A string with the max length of 1k characters.

DoubleConfigParameter

A 64 bit floating point value.

BooleanConfigParameter

A boolean value.

RegExpConfigParameter

A string validated using a regular expression.

DurationConfigParameter

A duration string, for example "2 minutes".

MemorySizeConfigParameter

A memory size string, for example "32M".

In addition to the predefined types, you can also define your own types.

Using a configuration parameter in a streamlet

The following section will break down how we can use an Integer configuration parameter type in a streamlet to request the value for a maximum number of records within a time window.

import cloudflow.streamlets._

object RecordSumFlow extends AkkaStreamlet {

  val recordsInWindowParameter = IntegerConfigParameter("records-in-window","This value describes how many records of data should be processed together, default 64 records", Some(64))

  override def configParameters = Set(recordsInWindowParameter)

  val inlet = AvroInlet[Metric]("metric")
  val outlet = AvroOutlet[SummedMetric]("summed-metric")
  val shape = StreamletShape.withInlets(inlet).withOutlets(outlet)

  def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph() = {
      val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)
      val flow = FlowWithCommittableContext[Metric].grouped(recordsInWindow).map(sumRecords)

      sourceWithOffsetContext(inlet)
        .via(flow)
        .to(committableSink(outlet))
    }
  }

  private def sumRecords(records: Seq[Metric]) : SummedMetric {....}
}

As seen in the example below, we first need to create an instance of IntegerConfigParameter.

val recordsInWindowParameter = IntegerConfigParameter("records-in-window","This value describes how many records of data should be processed together, default 64 records", Some(64))

The arguments provided to IntegerConfigParameter() are the following:

  • A key, which has to be unique within the streamlet.

  • Optionally, a description, which will be shown by the CLI.

  • Optionally, a default value, which will be used by the CLI when no value is passed during deploy.

After the configuration parameter is defined, we can use it to extract its value from the runtime configuration in the createLogic function:

val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)

Note that its up to the developer to use the correct config method to extract the value of the parameter. Since the type being used here is IntegerConfigParameter the config method used is getInt.

Custom validation

It is easy to create your own custom validation for a configuration parameter using the RegExpConfigParameter type. This type allows you to validate the entered value using a regular expression.

For example, if we want to validate a 24 hour timestamp, this is how it could be defined and used in a streamlet.

import java.time._
import cloudflow.streamlets._

object RecordFilterFlow extends AkkaStreamlet {

  private def militaryTimeConfigParameter(key: String, defaultValue: Option[String] = None) = {
    RegExpConfigParameter(
      key,
      "This parameter type validates that the users enter the time in 24h format.",
      "^(0[0-9]|1[0-9]|2[0-3]|[0-9]):[0-5][0-9]$",
      defaultValue
    )
  }

  val purgeRecordsBeforeTime = militaryTimeConfigParameter("purge-records-before-this-time",Some("20:00"))

  override def configParameters = Set(purgeRecordsBeforeTime)

  val inlet = AvroInlet[Metric]("metric")
  val outlet = AvroOutlet[SummedMetric]("metric")
  val shape = StreamletShape.withInlets(inlet).withOutlets(outlet)

  def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph() = {
      val filterTime = LocalTime.parse(streamletConfig.getString(purgeRecordsBeforeTime.key))
      val flow = FlowWithCommittableContext[Metric].filter(record.time.isAfter(filterTime))

      sourceWithOffsetContext(inlet)
        .via(flow)
        .to(committableSink(outlet))
    }
  }
}

Providing values for configuration parameters when testing streamlets

When writing tests for streamlets, you can provide values for configuration parameters when you initialize the runner-specific testkit.

If we want to write a test for the example streamlet RecordSumFlow, we could add values for the recordsInWindowParameter configuration parameter like this:

val testkit = AkkaStreamletTestKit(system, mat).withConfigParameterValues(ConfigParameterValue(RecordSumFlow.recordsInWindowParameter, "20"))

The Spark testkit has a similar function for adding values to configuration parameters when testing a streamlet.

val configTestKit = SparkStreamletTestkit(session).withConfigParameterValues(ConfigParameterValue(MySparkProcessor.NameFilter, "some-name"))

The Java API is slightly different as you can see in the example below:

AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system, mat).withConfigParameterValues(ConfigParameterValue.create(RecordSumFlow.recordsInWindowParameter, "20"));

Using configuration parameters in Java

Using the Configuration parameters in Java is similar to the Scala version. The main difference is how class instantiation is done and how to retrieve the config parameter key.

Creating an instance of a StringConfigParameter in Java:

StringConfigParameter nameFilter = StringConfigParameter.create("name-filter-value","Filters out the data in the stream that matches this name.");

Example of accessing the value of a configuration parameter in Java:

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import com.typesafe.config.Config;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.RunnableGraphStreamletLogic;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.AvroInlet;
import cloudflow.streamlets.avro.AvroOutlet;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class FilterStreamlet extends AkkaStreamlet {

  // Declare the volume mount
  private final VolumeMount referenceFiles =
      VolumeMount.createReadWriteMany("configuration", "/mnt/data");

  // Make the filter filename configurable
  private final StringConfigParameter filterFilenameConfig =
      StringConfigParameter.create(
              "filter-filename",
              "Name of the text file in the volume mount directory that contains the list of device ids to filter out.")
          .withDefaultValue("device-ids.txt");

  // Make polling interval configurable
  private final IntegerConfigParameter filterPollingInterval =
      IntegerConfigParameter.create(
              "filter-pollinginterval",
              "The interval in seconds the streamlet should check for updates to the filter file.")
          .withDefaultValue(10);

  @Override
  public VolumeMount[] defineVolumeMounts() {
    return new VolumeMount[] {referenceFiles};
  }

  @Override
  public ConfigParameter[] defineConfigParameters() {
    return new ConfigParameter[] {filterFilenameConfig, filterPollingInterval};
  }

  AvroInlet<Metric> inlet = AvroInlet.<Metric>create("in", Metric.class);
  AvroOutlet<Metric> outlet = AvroOutlet.<Metric>create("out", Metric.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  private Boolean findDeviceIdInFilterFile(String deviceId, ArrayList<String> filterDeviceIds) {
    for (String current : filterDeviceIds) {
      if (current.equals(deviceId)) {
        return false;
      }
    }
    return true;
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      final Config streamletConfig = getStreamletConfig();
      final Path referenceFilesPath = getMountedPath(referenceFiles);

      final Path filterFilenamePath =
          Paths.get(
              referenceFilesPath.toString(),
              streamletConfig.getString(filterFilenameConfig.getKey()));

      final FiniteDuration pollingInterval =
          FiniteDuration.create(
              streamletConfig.getInt(filterPollingInterval.getKey()), TimeUnit.SECONDS);

      final Source<ArrayList<String>, NotUsed> filterFileContent =
          DirectoryChangesSource.create(referenceFilesPath, pollingInterval, Integer.MAX_VALUE)
              .filter(
                  changedFile ->
                      changedFile.second() != DirectoryChange.Deletion
                          && changedFile.first().equals(filterFilenamePath))
              .map(Pair::first)
              .mapAsync(
                  1,
                  path ->
                      FileIO.fromPath(path)
                          .via(Framing.delimiter(ByteString.fromString("\n"), Integer.MAX_VALUE))
                          .runFold(
                              new ArrayList<String>(),
                              (acc, entry) -> {
                                acc.addAll(Collections.singletonList(entry.utf8String()));
                                return acc;
                              },
                              getMaterializer()));

      public RunnableGraph createRunnableGraph() {
        return getPlainSource(inlet)
            .via(Flow.create())
            .zipLatest(filterFileContent)
            .filter(
                filterFileAndMetric ->
                    findDeviceIdInFilterFile(
                        filterFileAndMetric.first().getDeviceId(), filterFileAndMetric.second()))
            .map(Pair::first)
            .to(getPlainSink(outlet));
      }
    };
  }
}

Providing configuration parameters when deploying a Cloudflow application

Configuration parameters will need to be provided with values during deployment of the application. The deploy command accepts these values as a set of key/value pairs.

The format for specifying configuration parameter values is as follows:

[streamlet name].[configuration parameter name]="[value]"

Deploying an application without specifying values for all required configuration parameters will fail and result in an error message like this:

$ oc plugin cloudflow deploy registry-default.my.kubernetes.cluster/cloudflow/call-record-pipeline:292-c183d80 cdr-aggregator.group-by-window="7 minute" cdr-aggregator.watermark="1 minute"

[Error] Please provide values for the following configuration parameter(s):
- cdr-generator2.records-per-second - Records per second to process.
- cdr-generator1.records-per-second - Records per second to process.

To successfully deploy the application, all configuration parameter values have to be provided on the command line:

$ oc plugin cloudflow deploy registry-default.my.kubernetes.cluster/cloudflow/call-record-pipeline:292-c183d80 cdr-aggregator.group-by-window="7 minute" cdr-aggregator.watermark="1 minute" cdr-generator1.records-per-second="10" cdr-generator2.records-per-second="10"

[Done] Deployment of application `call-record-aggregator` has started.

Configuration parameters can be omitted from the deployment command line as long as they have default values.

$ oc plugin cloudflow deploy registry-default.my.kubernetes.cluster/cloudflow/sensor-data-java:439-a5837b5

Default value 'device-ids.txt' will be used for configuration parameter 'filter.filter-filename'
Default value '10' will be used for configuration parameter 'filter.filter-pollinginterval'

[Done] Deployment of application `sensor-data-java` has started.

Streamlet volume mounts

Sometimes a streamlet needs to read and/or write files from/to some shared file system. Since streamlets run as processes on Kubernetes, they do not automatically have such a file system available. Cloudflow makes it possible for a streamlet to declare the need for a shared file system (e.g. a "volume" in Kubernetes terms) that should be mounted at a specific path. At deployment time the user can then indicate where that file system is actually located using a Kubernetes Persistent Volume Claim (PVC). Cloudflow will then make sure that the PVC will be mounted at the specified path at runtime and the streamlet can then treat it like a local file system.

The following example streamlet shows how to declare and use a volume mount:

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata

import java.nio.file
import java.nio.file._

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl._
import akka.util.ByteString
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import spray.json.JsonParser

import scala.concurrent.Future
import scala.concurrent.duration._

class SensorDataFileIngress extends AkkaStreamlet {

  import SensorDataJsonSupport._

  val out   = AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)

  private val sourceData = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)

  override def volumeMounts = Vector(sourceData)

  // Streamlet processing steps
  // 1. Every X seconds
  // 2. Enumerate all files in the mounted path
  // 3. Read each file *)
  // 4. Deserialize file content to a SensorData value *)

  // *) Note that reading and deserializing the file content is done in separate steps for readability only, in production they should be merged into one step for performance reasons.

  override def createLogic = new RunnableGraphStreamletLogic() {
    val listFiles: NotUsed ⇒ Source[file.Path, NotUsed] = { _ ⇒
      Directory.ls(getMountedPath(sourceData))
    }
    val readFile: Path ⇒ Source[ByteString, Future[IOResult]] = { path: Path ⇒
      FileIO.fromPath(path).via(JsonFraming.objectScanner(Int.MaxValue))
    }
    val parseFile: ByteString ⇒ SensorData = { jsonByteString ⇒
      JsonParser(jsonByteString.utf8String).convertTo[SensorData]
    }

    val emitFromFilesContinuously = Source
      .tick(1.second, 5.second, NotUsed)
      .flatMapConcat(listFiles)
      .flatMapConcat(readFile)
      .map(parseFile)
    def runnableGraph = emitFromFilesContinuously.to(plainSink(out))
  }
}

Java API

The Java API is slightly different from the Scala API. The example belows shows a streamlet that uses a read only volume mount.

/*
 * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sensordata;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import com.typesafe.config.Config;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.RunnableGraphStreamletLogic;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.AvroInlet;
import cloudflow.streamlets.avro.AvroOutlet;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class FilterStreamlet extends AkkaStreamlet {

  // Declare the volume mount
  private final VolumeMount referenceFiles =
      VolumeMount.createReadWriteMany("configuration", "/mnt/data");

  // Make the filter filename configurable
  private final StringConfigParameter filterFilenameConfig =
      StringConfigParameter.create(
              "filter-filename",
              "Name of the text file in the volume mount directory that contains the list of device ids to filter out.")
          .withDefaultValue("device-ids.txt");

  // Make polling interval configurable
  private final IntegerConfigParameter filterPollingInterval =
      IntegerConfigParameter.create(
              "filter-pollinginterval",
              "The interval in seconds the streamlet should check for updates to the filter file.")
          .withDefaultValue(10);

  @Override
  public VolumeMount[] defineVolumeMounts() {
    return new VolumeMount[] {referenceFiles};
  }

  @Override
  public ConfigParameter[] defineConfigParameters() {
    return new ConfigParameter[] {filterFilenameConfig, filterPollingInterval};
  }

  AvroInlet<Metric> inlet = AvroInlet.<Metric>create("in", Metric.class);
  AvroOutlet<Metric> outlet = AvroOutlet.<Metric>create("out", Metric.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  private Boolean findDeviceIdInFilterFile(String deviceId, ArrayList<String> filterDeviceIds) {
    for (String current : filterDeviceIds) {
      if (current.equals(deviceId)) {
        return false;
      }
    }
    return true;
  }

  public AkkaStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getContext()) {
      final Config streamletConfig = getStreamletConfig();
      final Path referenceFilesPath = getMountedPath(referenceFiles);

      final Path filterFilenamePath =
          Paths.get(
              referenceFilesPath.toString(),
              streamletConfig.getString(filterFilenameConfig.getKey()));

      final FiniteDuration pollingInterval =
          FiniteDuration.create(
              streamletConfig.getInt(filterPollingInterval.getKey()), TimeUnit.SECONDS);

      final Source<ArrayList<String>, NotUsed> filterFileContent =
          DirectoryChangesSource.create(referenceFilesPath, pollingInterval, Integer.MAX_VALUE)
              .filter(
                  changedFile ->
                      changedFile.second() != DirectoryChange.Deletion
                          && changedFile.first().equals(filterFilenamePath))
              .map(Pair::first)
              .mapAsync(
                  1,
                  path ->
                      FileIO.fromPath(path)
                          .via(Framing.delimiter(ByteString.fromString("\n"), Integer.MAX_VALUE))
                          .runFold(
                              new ArrayList<String>(),
                              (acc, entry) -> {
                                acc.addAll(Collections.singletonList(entry.utf8String()));
                                return acc;
                              },
                              getMaterializer()));

      public RunnableGraph createRunnableGraph() {
        return getPlainSource(inlet)
            .via(Flow.create())
            .zipLatest(filterFileContent)
            .filter(
                filterFileAndMetric ->
                    findDeviceIdInFilterFile(
                        filterFileAndMetric.first().getDeviceId(), filterFileAndMetric.second()))
            .map(Pair::first)
            .to(getPlainSink(outlet));
      }
    };
  }
}

If you want to use a writable volume mount you can replace createReadOnlyMany with createReadWriteMany above.

Access Modes and PVC Mounting

The PVC associated with the streamlet volume mount is required to have the same access mode as the volume mount declared in the streamlet. When deploying the application the access mode will be checked, if the access mode differs from the access mode declared in the streamlet, the deployment of the application will fail.

The following access modes are available:

  • ReadOnlyMany: all streamlet instances get read-only access to the same volume.

  • ReadWriteMany: all streamlet instances get read and write access to the same volume.

Cluster Security Considerations

When deploying a Cloudflow application that contains streamlets with a volume mount, you may have to apply additional Kubernetes security configuration resources to the Kubernetes cluster for the application to deploy successfully.

The pod in which the streamlet is running may need to be associated with a Pod Security Context (PSP) or (on OpenShift) a Security Context Constraint (SCC).

This can be done by associating the Cloudflow application service account, called cloudflow-app-serviceaccount and located in the namespace of the application, with a PSP/SCC.

The PSP and SCC must allow the application pods to mount a writable volume as group id 185. This is the group id of the user running in the streamlet container.

Security context constraints example

The following example shows an SCC that would allow a Cloudflow application with a writable volume mount to deploy correctly to an Openshift cluster with an activated SCC controller. See the OpenShift documentation on Managing Security Context Constraints for more information.

kind: SecurityContextConstraints
apiVersion: v1
metadata:
  name: cloudflow-application-scc
allowPrivilegedContainer: true
runAsUser:
  type: MustRunAsNonRoot
seLinuxContext:
  type: RunAsAny
fsGroup:
  type: MustRunAs
  ranges:
  - min: 185
    max: 186
supplementalGroups:
  type: RunAsAny
volumes:
- '*'

Pod security policy example

This is an example of a PSP that would allow a Cloudflow application with a writable volume mount to deploy correctly.

apiVersion: extensions/v1beta1
kind: PodSecurityPolicy
metadata:
  name: cloudflow-volume-mount-psp
spec:
  runAsUser:
    rule: 'MustRunAsNonRoot'
  seLinux:
    rule: 'RunAsAny'
  supplementalGroups:
    rule: 'RunAsAny'
  seLinux:
    rule: 'RunAsAny'
  fsGroup:
    rule: 'MustRunAs'
    ranges:
    - min: 185
      max: 186
  volumes:
  - '*'

Deploying applications using volume mounts

When deploying a Cloudflow application with streamlets that use the volume mount feature, a Kubernetes Persistent Volume Claim (PVC) will need to be specified for each of the volume mounts.

Before the application can be deployed, the PVC needs to be created in the application namespace.

When the PVC has been created, you can deploy the application and associate the PVC with the streamlet volume mount name using a CLI flag.

Deploying an application without a required volume mount will fail and result in an error message:

$ oc plugin cloudflow deploy sensor-data-java:427-a20fc62-dirty

[Error] The following volume mount needs to be bound to a Persistence Volume claim using the --volume-mount flag

- filter.configuration

To successfully deploy the application, the volume mount has to be bound to a PVC.

In the example below, the streamlet filter requires a volume mount called configuration. This volume mount is associated with the PVC named source-data-claim using the --volume-mount flag.

$ oc plugin cloudflow deploy sensor-data-java:427-a20fc62-dirty --volume-mount filter.configuration=source-data-claim

The following volume mount is now bound to Persistent Volume Claim `source-data-claim`:

- filter.configuration

[Done] Deployment of application `sensor-data-java` has started.

What’s next

After this general overview of streamlets, learn more about the specifics of using Akka, Spark, and Flink streamlets. Cloudflow also offers a way to test your streamlets in a local sandbox.