Using a local sandbox

One of the most common concerns of every distributed application developer is the fairly long and troublesome cycle of developing and testing their applications on a cluster. End to end testing of a distributed streaming application usually requires the creation of docker images, a lengthy upload of those images to some repository, and available cluster resources to testing.

Cloudflow includes a local execution mode, called 'Sandbox'. It lets you execute your complete application as a lightweight process on your machine. It only requires enough free memory to execute a scaled-down version of every streamlet.

The Cloudflow Sandbox accelerates the development of streaming applications by shortening the deployment and test cycle of functional requirements.

Sandbox is provided as an auto-plugin included in the cloudflow-streamlets package. It’s automatically available when an application enables support for Akka Streams, Spark, or Flink streamlets.

Using Sandbox

The Sandbox is accessible as an sbt task called runLocal in the build of your Cloudflow application.

It can be called directly from the command line, like:

$sbt runLocal

Or from an active sbt session, to run the application in the background with the current state displayed in the console:

$sbt
...
[project]>runLocal

In the example below, we see the summary output of sensor-data-scala, one of the Cloudflow examples available at https://github.com/lightbend/cloudflow/tree/master/examples/sensor-data-scala:

---------------------------------- Streamlets ----------------------------------
file-ingress [sensordata.SensorDataFileIngress]
  - mount [source-data-mount] available at [/tmp/local-cloudflow2714008686931483476/source-data-mount]
http-ingress [sensordata.SensorDataHttpIngress]
  - HTTP port [3000]
invalid-logger [sensordata.InvalidMetricLogger]
merge [sensordata.SensorDataMerge]
metrics [sensordata.SensorDataToMetrics]
rotor-avg-logger [sensordata.RotorspeedWindowLogger]
rotorizer [sensordata.RotorSpeedFilter]
valid-logger [sensordata.ValidMetricLogger]
validation [sensordata.MetricsValidation]
--------------------------------------------------------------------------------

--------------------------------- Connections ---------------------------------
validation.valid -> valid-logger.in
validation.valid -> rotorizer.in
rotorizer.out -> rotor-avg-logger.in
validation.invalid -> invalid-logger.in
merge.out -> metrics.in
http-ingress.out -> merge.in-0
metrics.out -> validation.in
file-ingress.out -> merge.in-1
--------------------------------------------------------------------------------

------------------------------------ Output ------------------------------------
Pipeline log output available in file: /tmp/local-cloudflow2714008686931483476/local-cloudflow3796438553652935419.log
--------------------------------------------------------------------------------

Running sensor-data-scala
To terminate, press [ENTER]

We can appreciate three main sections of this info panel: Streamlets, Connections, and Output.

Streamlets The streamlets info panel provides a list of the streamlets instantiated in this application; below each streamlet name there may be one or more local resources printed, like a volume mount or TCP port For example, in the example above, file-ingress is using a volume mount assigned to a local directory:

file-ingress [sensordata.SensorDataFileIngress] - mount [source-data-mount] available at [/tmp/local-cloudflow2714008686931483476/source-data-mount] The http-ingress is offering an HTTP endpoint on port 3000 http-ingress [sensordata.SensorDataHttpIngress] - HTTP port [3000] Connections The Connections panel shows how the streamlets are connected to one another. It represents the connection schema specified in the blueprint.

Output The Output panel shows where the output of the running application is made available. You can use your favorite text editor or command-line tools to inspect the output and verify that your application is doing what you expect.

Terminating a Running Application The application executes on the background until [ENTER] is pressed, which terminates the application process. The file containing the output of the running application is preserved for later examination.

Streamlet Features in Sandbox

In a Cloudflow application, Streamlets offer several customization options such as configuration requirements, volume mounts, and server ports. The Sandbox offers a local implementation of these options that are meaningful in a local environment.

The local configuration file

Applications running in the Sandbox can specify custom values for the local environment by making use of a local configuration file in HOCON format. This file is called local.conf by default and is assumed to be available on the classpath, usually under the application/src/main/resources folder. A custom local configuration file can be specified in the build, using the runLocalConfigFile key. For example, in this build.sbt file, we change the local configuration to myruntime.conf in the root dir of the project

runLocalConfigFile := Some("./myruntime.conf"),
The contents of this file are organized by streamlet name, using the streamlet name that you specified in the blueprint.

file-ingress {
  //config-key = value  -- values for the file-ingress streamlet
}

rotor-avg-logger {
  //config-key = value  -- values for the rotor-avg-logger streamlet
}

Note that because this file is in HOCON format, dot-notation is also supported:

file-ingress.<config-key>=value
rotor-avg-logger.<config-key>=value

Using configuration values

The Streamlet API lets us declare configuration parameters that can be specified at deployment time. For example, this declaration allows us to provide a custom prefix value as a String:

class ValidMetricLogger extends AkkaStreamlet {
//...

  val MsgPrefix = StringConfigParameter("msg-prefix", "Provide a prefix for the log lines", Some("valid-logger"))
//...

Let’s assume that we have declared a metric-logger streamlet in the blueprint of this application. When running in the Sandbox, we can specify a custom value for this configuration parameter in the local configuration file as:

metric-logger {
  msg-prefix = “local”
}

Using Volume Mounts

In the Streamlet API, Volume Mounts are declared in a similar way to configuration parameters, using a programmatic description of the Volume Mount that includes its desired mount path. In the following example, we declare a Volume Mount with name source-data-mount, requested to be mounted at /mnt/data, and it requires to have a ReadWriteMany access mode:

  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

In a Kubernetes deployment, that Volume Mount gets mapped to a Permanent Volume Claim. The requested mount path is replaced by a local path when we use the Sandbox to run an application containing one or more streamlets that declare a Volume Mount. The local path assignment can be configured to point to a specific directory. Otherwise, the mount path will be assigned to a temporary directory, created on the fly.

The override configuration for volume mounts must be specified in the local configuration file that we discussed earlier.

This example shows a configuration for the Volume Mount named source-data-mount, which we declared earlier in this section, and it points to the local directory /tmp/cloudflow

file-ingress {
  source-data-mount="/tmp/cloudflow"
}

Note that for this feature to work properly, it’s important to request the assigned mount path from the StreamletContext, instead of relying on a hardcoded value, like this:

// in the streamlet code
// volume mount declaration
  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

// use
    val listFiles: NotUsed ⇒ Source[Path, NotUsed] = { _ ⇒
      Directory.ls(getMountedPath(sourceData))
    }

In the Streamlet programming, do not assume that the mounted path is the same as the requested path. Do not do this:

// Do not access the mount path directly!
    val files = Directory.ls(FileSystems.getDefault().getPath("/mnt/data"))

What’s next

Learn more about the specifics of using Akka, Spark, and Flink streamlets.