Using the Local Sandbox

One of the most common concerns of every distributed application (cloud) developer is the fairly long and troublesome cycle of developing and testing their applications on a cluster. End to end testing of a distributed streaming application usually requires the creation of artifacts, such as Docker images, a lengthy upload of those artifacts to some repository, and available cluster resources for testing.

Cloudflow includes a local execution mode, called Sandbox. It lets you execute your complete application as a lightweight process on your machine. The sandbox only requires sufficient free memory to execute a scaled-down version of every streamlet. As a rough guideline, we can run a 5-streamlet application in 1 GB of memory.

The Cloudflow Sandbox accelerates the development of streaming applications by shortening the deployment and test cycle of functional requirements.

The Sandbox is provided as an auto-plugin included in the sbt-cloudflow plugin. It is automatically available when an application enables support for Akka, Spark, or Flink streamlets.

Using the Sandbox

The Sandbox is accessible as an sbt task called runLocal in the root build of your Cloudflow application.

It can be called directly from the command line, as we show here:

$sbt runLocal

Or from an active sbt session.

$sbt
...
[project]>runLocal

The Sandbox runs the application in the background and provides a summary of the application characteristics, ports (if any), mount-volumes (if any), and the set of files where we can inspect the output of the different streamlets.

In the example below, we see the summary output of call-record-aggregator, one of the Cloudflow examples available at examples/call-record-aggregator:

(1)
[success] /cloudflow/examples/call-record-aggregator/call-record-pipeline/src/main/blueprint/blueprint.conf verified.
 ┌──────────────┐ ┌──────────────┐ ┌───────────┐
 │cdr-generator1│ │cdr-generator2│ │cdr-ingress│
 └───────┬──────┘ └──────┬───────┘ └─────┬─────┘
         │               │               │
         └─────────┐     │     ┌─────────┘
                   │     │     │
                   v     v     v
            ┌────────────────────────┐
            │[generated-call-records]│
            └────────────┬───────────┘
                         │
                         v
                      ┌─────┐
                      │split│
                      └─┬─┬─┘
                        │ │
                        │ └───────────┐
                        │             └┐
                        v              │
         ┌────────────────────┐        │
         │[valid-call-records]│        │
         └─────────┬──────────┘        │
                   │                   │
                   v                   │
           ┌──────────────┐            │
           │cdr-aggregator│            │
           └─┬────────────┘            │
             │                         │
             v                         v
 ┌───────────────────────┐ ┌──────────────────────┐
 │[aggregated-call-stats]│ │[invalid-call-records]│
 └────────────┬──────────┘ └───────┬──────────────┘
              │                    │
              v                    v
      ┌──────────────┐      ┌────────────┐
      │console-egress│      │error-egress│
      └──────────────┘      └────────────┘
(2)
---------------------------- Streamlets per project ----------------------------
spark-aggregation - output file: file:/tmp/cloudflow-local-run3031754564524796564/spark-aggregation-local.log

	cdr-aggregator [carly.aggregator.CallStatsAggregator]
	cdr-generator1 [carly.aggregator.CallRecordGeneratorIngress]
	cdr-generator2 [carly.aggregator.CallRecordGeneratorIngress]

akka-java-aggregation-output - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-java-aggregation-output-local.log

	console-egress [carly.output.AggregateRecordEgress]
	error-egress [carly.output.InvalidRecordEgress]

akka-cdr-ingestor - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-cdr-ingestor-local.log

	cdr-ingress [carly.ingestor.CallRecordIngress]
	- HTTP port [3000]
	split [carly.ingestor.CallRecordSplit]

--------------------------------------------------------------------------------

(3)
------------------------------------ Topics ------------------------------------
[aggregated-call-stats]
[generated-call-records]
[invalid-call-records]
[valid-call-records]
--------------------------------------------------------------------------------

(4)
----------------------------- Local Configuration -----------------------------
No local configuration provided.
--------------------------------------------------------------------------------

(5)
------------------------------------ Output ------------------------------------
Pipeline log output available in folder: /tmp/cloudflow-local-run3031754564524796564
--------------------------------------------------------------------------------

Running call-record-aggregator
To terminate, press [ENTER]
1 Topology View
2 Streamlets
3 Topics
4 Local Configuration
5 Output Information

We can appreciate five main sections of this info panel: The Topology View, Streamlets, Topics, Local Configuration, and Output Information.

The Topology View

The first part of the output of the runLocal command is an ascii art representation of the topology of the application. This is the directed graph of connections between streamlets and topics that shows how the components are connected to one another and how the data flows within the application.

Streamlets

The streamlets info panel provides a list of the streamlets instantiated in this application. As we see in the example above, the streamlets are grouped by sub-project (if sub-projects are used). For each group, we also have an output file that aggregates the output of all streamlets in that group.

Internally, each group of streamlets will run in a separate JVM to isolate the dependencies of each sub-project. Below each streamlet name there may be one or more local resources printed, like a volume mount or a TCP port. For example, in the example above, cdr-ingress is offering an HTTP endpoint on port 3000.

Topics

The name of this panel is self-explaining. It list the topics used by this application. In the sandbox, all cloudflow-managed topics are created on an in-memory Kafka instance. If external topics are used in the application, they must be reachable from the local machine. Otherwise, the connection to it will fail.

Local Configuration

It’s possible to provide a custom configuration for the streamlets running in local mode. For example, to connect to a local instance of a database, instead of an external (cloud) service. The local configuration is explained below in The local configuration file section.

Output

The Output panel shows the directory where all the output of the running application is made available. You can use your favorite text editor or command-line tools to inspect the output and verify that your application is doing what you expect.

The Running Application

The runLocal task remains active until the ENTER key is pressed. While the application is running, you can interact with it through its open interfaces. If you included a Streamlet with a server endpoint, it’s HTTP ports will be available to receive data. Another interesting way of exercising your application is to include data generating streamlets that simulate and inject the expected data into the running system.

To inspect the output, use command line tools, like tail, or text editors to consume the output generated in the temporary files that capture the streamlet’s output and logs.

the file:// URLs provided by the runLocal output are clickable in most systems.

Terminating a Running Application

The application executes on the background until the [ENTER] key is pressed, which terminates all the application processes. The files containing the output of the running application are preserved for later examination.

Streamlet Features in Sandbox

In a Cloudflow application, Streamlets offer several customization options such as configuration requirements, volume mounts, and server ports. The Sandbox offers a local implementation of these options that are meaningful in a local environment.

The local configuration file

Applications running in the Sandbox can specify custom values for the local environment by making use of a local configuration file in HOCON format. This file is called local.conf by default and is assumed to be available on the classpath, usually under the application/src/main/resources folder. A custom local configuration file can be specified in the build.sbt, using the runLocalConfigFile key. For example, in this build.sbt file, we change the local configuration to myruntime.conf in the root dir of the project

runLocalConfigFile := Some("./myruntime.conf"),
The contents of this file are organized by streamlet name, using the streamlet name that you specified in the blueprint.

file-ingress {
  //config-key = value  -- values for the file-ingress streamlet
}

rotor-avg-logger {
  //config-key = value  -- values for the rotor-avg-logger streamlet
}

Note that because this file is in HOCON format, dot-notation is also supported:

file-ingress.<config-key>=value
rotor-avg-logger.<config-key>=value

Using configuration values

The Streamlet API lets us declare configuration parameters that can be specified at deployment time. For example, this declaration allows us to provide a custom prefix value as a String:

class ValidMetricLogger extends AkkaStreamlet {
//...

  val MsgPrefix = StringConfigParameter("msg-prefix", "Provide a prefix for the log lines", Some("valid-logger"))
//...

Let’s assume that we have declared a metric-logger streamlet in the blueprint of this application. When running in the Sandbox, we can specify a custom value for this configuration parameter in the local configuration file as:

metric-logger {
  msg-prefix = “local”
}

Using Volume Mounts

In the Streamlet API, Volume Mounts are declared in a similar way to configuration parameters, using a programmatic description of the Volume Mount that includes its desired mount path. In the following example, we declare a Volume Mount with name source-data-mount, requested to be mounted at /mnt/data, and it requires to have a ReadWriteMany access mode:

  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

In a Kubernetes deployment, that Volume Mount gets mapped to a Permanent Volume Claim. The requested mount path is replaced by a local path when we use the Sandbox to run an application containing one or more streamlets that declare a Volume Mount. The local path assignment can be configured to point to a specific directory. Otherwise, the mount path will be assigned to a temporary directory, created on the fly.

The override configuration for volume mounts must be specified in the local configuration file that we discussed earlier.

This example shows a configuration for the Volume Mount named source-data-mount, which we declared earlier in this section, and it points to the local directory /tmp/cloudflow

file-ingress {
  source-data-mount="/tmp/cloudflow"
}

Note that for this feature to work properly, it’s important to request the assigned mount path from the StreamletContext, instead of relying on a hardcoded value, like this:

// in the streamlet code
// volume mount declaration
  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

// use
    val listFiles: NotUsed ⇒ Source[Path, NotUsed] = { _ ⇒
      Directory.ls(getMountedPath(sourceData))
    }

In the Streamlet programming, do not assume that the mounted path is the same as the requested path. Do not do this:

// Do not access the mount path directly!
    val files = Directory.ls(FileSystems.getDefault().getPath("/mnt/data"))

What’s next

Learn more about the specifics of using Akka, Spark, and Flink streamlets.