Using the Local Sandbox

One of the most common concerns of every distributed application (cloud) developer is the fairly long and troublesome cycle of developing and testing their applications on a cluster. End to end testing of a distributed streaming application usually requires the creation of artifacts, such as Docker images, a lengthy upload of those artifacts to some repository, and available cluster resources for testing.

Cloudflow includes a local execution mode, called Sandbox. It lets you execute your complete application as a lightweight process on your machine. The sandbox only requires sufficient free memory to execute a scaled-down version of every streamlet. As a rough guideline, we can run a 5-streamlet application in 1 GB of memory.

The Cloudflow Sandbox accelerates the development of streaming applications by shortening the deployment and test cycle of functional requirements.

The Sandbox is provided as an auto-plugin included in the sbt-cloudflow plugin. It is automatically available when an application enables support for Akka, Spark, or Flink streamlets.

Using the Sandbox

The Sandbox is accessible as an sbt task called runLocal in the root build of your Cloudflow application.

If you are trying to run the examples contained in the upstream cloudflow repository remember to run export CLOUDFLOW_VERSION=2.2.2 before invoking sbt.

It can be called directly from the command line, as we show here:

$sbt runLocal

Or from an active sbt session.

$sbt
...
[project]>runLocal

The Sandbox runs the application in the background and provides a summary of the application characteristics, ports (if any), mount-volumes (if any), and the set of files where we can inspect the output of the different streamlets.

In the example below, we see the summary output of call-record-aggregator, one of the Cloudflow examples available at examples/call-record-aggregator:

(1)
[success] /cloudflow/examples/call-record-aggregator/call-record-pipeline/src/main/blueprint/blueprint.conf verified.
 ┌──────────────┐ ┌──────────────┐ ┌───────────┐
 │cdr-generator1│ │cdr-generator2│ │cdr-ingress│
 └───────┬──────┘ └──────┬───────┘ └─────┬─────┘
         │               │               │
         └─────────┐     │     ┌─────────┘
                   │     │     │
                   v     v     v
            ┌────────────────────────┐
            │[generated-call-records]│
            └────────────┬───────────┘
                         │
                         v
                      ┌─────┐
                      │split│
                      └─┬─┬─┘
                        │ │
                        │ └───────────┐
                        │             └┐
                        v              │
         ┌────────────────────┐        │
         │[valid-call-records]│        │
         └─────────┬──────────┘        │
                   │                   │
                   v                   │
           ┌──────────────┐            │
           │cdr-aggregator│            │
           └─┬────────────┘            │
             │                         │
             v                         v
 ┌───────────────────────┐ ┌──────────────────────┐
 │[aggregated-call-stats]│ │[invalid-call-records]│
 └────────────┬──────────┘ └───────┬──────────────┘
              │                    │
              v                    v
      ┌──────────────┐      ┌────────────┐
      │console-egress│      │error-egress│
      └──────────────┘      └────────────┘
(2)
---------------------------- Streamlets per project ----------------------------
spark-aggregation - output file: file:/tmp/cloudflow-local-run3031754564524796564/spark-aggregation-local.log

	cdr-aggregator [carly.aggregator.CallStatsAggregator]
	cdr-generator1 [carly.aggregator.CallRecordGeneratorIngress]
	cdr-generator2 [carly.aggregator.CallRecordGeneratorIngress]

akka-java-aggregation-output - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-java-aggregation-output-local.log

	console-egress [carly.output.AggregateRecordEgress]
	error-egress [carly.output.InvalidRecordEgress]

akka-cdr-ingestor - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-cdr-ingestor-local.log

	cdr-ingress [carly.ingestor.CallRecordIngress]
	- HTTP port [3000]
	split [carly.ingestor.CallRecordSplit]

--------------------------------------------------------------------------------

(3)
------------------------------------ Topics ------------------------------------
[aggregated-call-stats]
[generated-call-records]
[invalid-call-records]
[valid-call-records]
--------------------------------------------------------------------------------

(4)
----------------------------- Local Configuration -----------------------------
No local configuration provided.
--------------------------------------------------------------------------------

(5)
------------------------------------ Output ------------------------------------
Pipeline log output available in folder: /tmp/cloudflow-local-run3031754564524796564
--------------------------------------------------------------------------------

Running call-record-aggregator
To terminate, press [ENTER]
1 Topology View
2 Streamlets
3 Topics
4 Local Configuration
5 Output Information

We can appreciate five main sections of this info panel: The Topology View, Streamlets, Topics, Local Configuration, and Output Information.

The Topology View

The first part of the output of the runLocal command is an ascii art representation of the topology of the application. This is the directed graph of connections between streamlets and topics that shows how the components are connected to one another and how the data flows within the application.

Streamlets

The streamlets info panel provides a list of the streamlets instantiated in this application. As we see in the example above, the streamlets are grouped by sub-project (if sub-projects are used). For each group, we also have an output file that aggregates the output of all streamlets in that group.

Internally, each group of streamlets will run in a separate JVM to isolate the dependencies of each sub-project. Below each streamlet name there may be one or more local resources printed, like a volume mount or a TCP port. For example, in the example above, cdr-ingress is offering an HTTP endpoint on port 3000.

Topics

The name of this panel is self-explaining. It list the topics used by this application. In the sandbox, all cloudflow-managed topics are created on an in-memory Kafka instance. If external topics are used in the application, they must be reachable from the local machine. Otherwise, the connection to it will fail.

Local Configuration

It’s possible to provide a custom configuration for the streamlets running in local mode. For example, to connect to a local instance of a database, instead of an external (cloud) service. The local configuration is explained below in Using a Local Configuration File section.

Output

The Output panel shows the directory where all the output of the running application is made available. You can use your favorite text editor or command-line tools to inspect the output and verify that your application is doing what you expect.

The Running Application

The runLocal task remains active until the ENTER key is pressed. While the application is running, you can interact with it through its open interfaces. If you included a Streamlet with a server endpoint, it’s HTTP ports will be available to receive data. Another interesting way of exercising your application is to include data generating streamlets that simulate and inject the expected data into the running system.

To inspect the output, use command line tools, like tail, or text editors to consume the output generated in the temporary files that capture the streamlet’s output and logs.

the file:// URLs provided by the runLocal output are clickable in most systems.

Terminating a Running Application

The application executes on the background until the [ENTER] key is pressed, which terminates all the application processes. The files containing the output of the running application are preserved for later examination.

Streamlet Features in Sandbox

In a Cloudflow application, Streamlets offer several customization options such as configuration requirements, volume mounts, and server ports. The Sandbox offers a local implementation of these options that are meaningful in a local environment.

Using a Local Configuration File

Applications running in the sandbox can specify custom values for the local environment by making use of a local configuration file in HOCON format.

This file is usually located in the src/main/resources folder of your application project. The full relative path to this local configuration file must be specified in the build.sbt, using the runLocalConfigFile key, like we show in the following snippet:

build.sbt
lazy val sensorData =  (project in file("."))
    .enablePlugins(CloudflowApplicationPlugin, CloudflowAkkaPlugin)
    .settings(
      scalaVersion := "2.13.3",
      runLocalConfigFile := Some("src/main/resources/local.conf"), (1)
      runLocalLog4jConfigFile := Some("src/main/resources/log4j.xml"), (2)
      name := "sensor-data-scala",
...

The above example also shows how to set a custom Log4j configuration with the runLocalLog4jConfigFile (2). If this key is ommitted a default log4j configuration is used that is packaged with the sbt-cloudflow plugin.

Providing Configuration Values

With the local configuration file, you can provide values to streamlet configuration parameters and point volume mounts to locally available folders.

The configuration provided in this file follows the same principles as the general streamlet configuration system in Cloudflow: It uses HOCON syntax to define configuration entries, and it’s hierarchically structured by streamlet under the cloudflow.streamlets.<streamlet-name> key.

For example, in the following configuration file, we give a local path to a volume-mount named file-ingress and provide a local value for a configuration parameter called valid-logger.

Example of a local.conf file
cloudflow {
	streamlets {
		# configures the log-level configuration parameter of the valid-logger streamlet
		valid-logger {
			config-parameters {
			  log-level = "info"
			}
		}
		# configures the volume mount for the file-ingress streamlet, when added to the blueprint
		file-ingress {
			volume-mounts {
		  		source-data-mount="/tmp/cloudflow"
			}
		}
	}
}

Note that because this file is in HOCON format, dot-notation is also supported.

cloudflow.streamlets.<streamlet-name>.<config-scope>.<config-key>=value

Supported Configuration Scopes

In the sandbox runtime, only two configuration scopes of streamlets are currently supported:

  • configuration parameters, under the config-parameters scope.

  • volume mounts, under the volume-mounts scope.

The general syntax for a configuration entry is:

cloudflow.streamlets.valid-logger.<config-scope>.<config-key>=value

where the config-scope may be either config-parameters or volume-mounts.

We can provide values for these two scopes as we explain in the following section.

Using config-parameters

The Streamlet API lets us declare configuration parameters that can be specified at deployment time. For example, this declaration allows us to provide a custom prefix value as a String:

class ValidMetricLogger extends AkkaStreamlet {
//...

  val MsgPrefix = StringConfigParameter("msg-prefix", "Provide a prefix for the log lines", Some("valid-logger"))
//...

Let’s assume that we have declared a valid-logger streamlet in the blueprint of this application. When running in the Sandbox, we can specify a custom value for this configuration parameter in the local configuration file as:

# configures the log-level configuration parameter of the valid-logger streamlet
cloudflow.streamlets.valid-logger.config-parameters.log-level = "info"

Using Volume Mounts

In the Streamlet API, volume mounts—​a Kubernetes construct to access disk storage—​are declared in a similar way to configuration parameters, using a programmatic description of the volume mount that includes its desired mount path. In the following example, we declare a volume mount with name source-data-mount, requested to be mounted at /mnt/data in the pod’s filesystem, and it requires to have a ReadWriteMany access mode:

  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

In a Kubernetes deployment, that volume mount gets mapped to a PersistentVolumeClaim (PVC).

When running in the sandbox, the requested mount path must be replaced by a local folder to run the application successfully in the local machine. The local path assignment may be configured to point to a specific directory. If no explicit configuration is provided, the mount path will be assigned to a temporary directory, created at runtime.

This example shows a local configuration for the volume mount named source-data-mount, which we declared earlier in this section, and it points to the local directory /tmp/cloudflow

# configures the volume mount for the file-ingress streamlet, when added to the blueprint
cloudflow.streamlets.file-ingress.volume-mounts.source-data-mount="/tmp/cloudflow"

Note that for this feature to work properly, it’s important to request the assigned mount path from the StreamletContext, instead of relying on a hardcoded value, like this:

// in the streamlet code
// volume mount declaration
  private val sourceData    = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(sourceData)

// use
    val listFiles: NotUsed => Source[Path, NotUsed] = { _ =>
      Directory.ls(getMountedPath(sourceData))
    }

In the Streamlet programming, do not assume that the mounted path is the same as the requested path. Do not do this!

// Do not access the mount path directly!
    val files = Directory.ls(FileSystems.getDefault().getPath("/mnt/data"))

What’s next

Learn more about the specifics of using Akka, Spark, and Flink streamlets.