Streamlet Configuration

The Streamlet API provides methods to declare configuration parameters for a streamlet. Configuration parameters can be used to indicate that a streamlet requires configuration before it is run.

Examples of configuration parameters are database connection strings, URLs, credentials, or anything else that you want to specify at deployment time.

A streamlet specifies that it requires particular config parameters by expressing them in code. The values for these parameters will be requested, validated, and set when kubectl cloudflow deploy or kubectl cloudflow configure is used to deploy or configure the Cloudflow application.

There are a number of predefined configuration parameter types:

IntegerConfigParameter

A signed 32 bit integer value.

StringConfigParameter

A string with the max length of 1k characters.

DoubleConfigParameter

A 64 bit floating point value.

BooleanConfigParameter

A boolean value.

RegExpConfigParameter

A string validated using a regular expression.

DurationConfigParameter

A duration string, for example "2 minutes".

MemorySizeConfigParameter

A memory size string, for example "32M".

In addition to the predefined types, you can also define your own types.

Using a configuration parameter in a streamlet

The following section will break down how we can use an Integer configuration parameter type in a streamlet to request the value for a maximum number of records within a time window.

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._

object RecordSumFlow extends AkkaStreamlet {

  val recordsInWindowParameter = IntegerConfigParameter(
    "records-in-window",
    "This value describes how many records of data should be processed together, default 64 records",
    Some(64)
  )

  override def configParameters = Vector(recordsInWindowParameter)

  val inlet  = AvroInlet[Metric]("metric")
  val outlet = AvroOutlet[SummedMetric]("summed-metric")
  val shape  = StreamletShape.withInlets(inlet).withOutlets(outlet)

  def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph() = {
      val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)
      val flow = FlowWithCommittableContext[Metric].grouped(recordsInWindow).map(sumRecords).mapContext(_.last)

      sourceWithCommittableContext(inlet)
        .via(flow)
        .to(committableSink(outlet))
    }
  }

  private def sumRecords(records: Seq[Metric]): SummedMetric =
    // ...
}

As seen in the example below, we first need to create an instance of IntegerConfigParameter.

  val recordsInWindowParameter = IntegerConfigParameter(
    "records-in-window",
    "This value describes how many records of data should be processed together, default 64 records",
    Some(64)
  )

The arguments provided to IntegerConfigParameter() are the following:

  • A key, which has to be unique within the streamlet.

  • Optionally, a description, which will be shown by the CLI.

  • Optionally, a default value, which will be used by the CLI when no value is passed during deploy.

After the configuration parameter is defined, we can use it to extract its value from the runtime configuration in the createLogic function:

      val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)

Note that its up to the developer to use the correct config method to extract the value of the parameter. Since the type being used here is IntegerConfigParameter the config method used is getInt.

Custom validation

It is easy to create your own custom validation for a configuration parameter using the RegExpConfigParameter type. This type allows you to validate the entered value using a regular expression.

For example, if we want to validate a 24 hour timestamp, this is how it could be defined and used in a streamlet.

  val militaryTimeParameter =
    RegExpConfigParameter(
      "time",
      "This parameter type validates that the users enter the time in 24h format.",
      "^(0[0-9]|1[0-9]|2[0-3]|[0-9]):[0-5][0-9]$",
      Some("08:00")
    )

Providing values for configuration parameters when testing streamlets

When writing tests for streamlets, you can provide values for configuration parameters when you initialize the runner-specific testkit.

If we want to write a test for the example streamlet RecordSumFlow, we could add values for the recordsInWindowParameter configuration parameter like this:

    val testkit =
      AkkaStreamletTestKit(system).withConfigParameterValues(ConfigParameterValue(RecordSumFlow.recordsInWindowParameter, "20"))

The Spark testkit has a similar function for adding values to configuration parameters when testing a streamlet.

    val testKit = SparkStreamletTestkit(session).withConfigParameterValues(ConfigParameterValue(RecordSumFlow.recordsInWindowParameter, "20"))

The Java API is slightly different as you can see in the example below:

    AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system).withConfigParameterValues(ConfigParameterValue.create(RecordSumFlow.recordsInWindowParameter, "20"));

Using configuration parameters in Java

Using the Configuration parameters in Java is similar to the Scala version. The main difference is how class instantiation is done and how to retrieve the config parameter key.

Creating an instance of a StringConfigParameter in Java:

    // Make the filter filename configurable
    private final StringConfigParameter filterFilenameConfig = StringConfigParameter.create(
                "filter-filename",
                "Name of the text file in the volume mount directory that contains the list of keys to filter out."
            ).withDefaultValue("device-ids.txt");

Example of accessing the value of a configuration parameter in Java:

            final Path filterFilenamePath = Paths.get(referenceFilesPath.toString(),
                    filterFilenameConfig.getValue(getContext()));

Providing Configuration Parameters when Deploying a Cloudflow Application

Configuration parameters will need to be provided with values during deployment of the application. The deploy command accepts these values as a set of key/value pairs or in a configuration file.

The format for specifying configuration parameter values is as follows:

[config-path]="[value]"

Deploying an application without specifying values for all required configuration parameters will fail and result in an error message like the following.

$ kubectl cloudflow deploy target/call-record-pipeline.json \
  cloudflow.streamlets.cdr-aggregator.config-parameters.group-by-window="7 minute" \
  cloudflow.streamlets.cdr-aggregator.config-parameters.watermark="1 minute"

[Error] Please provide values for the following configuration parameter(s):
- cloudflow.streamlets.cdr-generator1.config-parameters.records-per-second - Records per second to process.
- cloudflow.streamlets.cdr-generator2.config-parameters.records-per-second - Records per second to process.

To successfully deploy the application, all configuration parameter values have to be provided via a configuration file using --conf file or directly on the command line:

$ kubectl cloudflow deploy target/call-record-pipeline.json --conf test-config.conf

[Done] Deployment of application `call-record-aggregator` has started.

Configuration parameters can be omitted from the configuration file or the deployment command line as long as they have default values.

$ kubectl cloudflow deploy target/sensor-data-java.json

Default value 'device-ids.txt' will be used for configuration parameter 'cloudflow.streamlets.filter.config-parameters.filter-filename'
Default value '10' will be used for configuration parameter 'cloudflow.streamlets.filter.config-parameters.filter-pollinginterval'

[Done] Deployment of application `sensor-data-java` has started.

For more information, see The Cloudflow Configuration Model