Using the Local Sandbox
One of the most common concerns of every distributed application (cloud) developer is the fairly long and troublesome cycle of developing and testing their applications on a cluster. End to end testing of a distributed streaming application usually requires the creation of artifacts, such as Docker images, a lengthy upload of those artifacts to some repository, and available cluster resources for testing.
Cloudflow includes a local execution mode, called Sandbox. It lets you execute your complete application as a lightweight process on your machine. The sandbox only requires sufficient free memory to execute a scaled-down version of every streamlet. As a rough guideline, we can run a 5-streamlet application in 1 GB of memory.
The Cloudflow Sandbox accelerates the development of streaming applications by shortening the deployment and test cycle of functional requirements.
The Sandbox is provided as an auto-plugin included in the sbt-cloudflow
plugin.
It is automatically available when an application enables support for Akka, Spark, or Flink streamlets.
Using the Sandbox
The Sandbox is accessible as an sbt
task called runLocal
in the root build of your Cloudflow application.
It can be called directly from the command line, as we show here:
$sbt runLocal
Or from an active sbt
session.
$sbt
...
[project]>runLocal
The Sandbox runs the application in the background and provides a summary of the application characteristics, ports (if any), mount-volumes (if any), and the set of files where we can inspect the output of the different streamlets.
In the example below, we see the summary output of call-record-aggregator
, one of the Cloudflow examples available at examples/call-record-aggregator:
(1)
[success] /cloudflow/examples/call-record-aggregator/call-record-pipeline/src/main/blueprint/blueprint.conf verified.
┌──────────────┐ ┌──────────────┐ ┌───────────┐
│cdr-generator1│ │cdr-generator2│ │cdr-ingress│
└───────┬──────┘ └──────┬───────┘ └─────┬─────┘
│ │ │
└─────────┐ │ ┌─────────┘
│ │ │
v v v
┌────────────────────────┐
│[generated-call-records]│
└────────────┬───────────┘
│
v
┌─────┐
│split│
└─┬─┬─┘
│ │
│ └───────────┐
│ └┐
v │
┌────────────────────┐ │
│[valid-call-records]│ │
└─────────┬──────────┘ │
│ │
v │
┌──────────────┐ │
│cdr-aggregator│ │
└─┬────────────┘ │
│ │
v v
┌───────────────────────┐ ┌──────────────────────┐
│[aggregated-call-stats]│ │[invalid-call-records]│
└────────────┬──────────┘ └───────┬──────────────┘
│ │
v v
┌──────────────┐ ┌────────────┐
│console-egress│ │error-egress│
└──────────────┘ └────────────┘
(2)
---------------------------- Streamlets per project ----------------------------
spark-aggregation - output file: file:/tmp/cloudflow-local-run3031754564524796564/spark-aggregation-local.log
cdr-aggregator [carly.aggregator.CallStatsAggregator]
cdr-generator1 [carly.aggregator.CallRecordGeneratorIngress]
cdr-generator2 [carly.aggregator.CallRecordGeneratorIngress]
akka-java-aggregation-output - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-java-aggregation-output-local.log
console-egress [carly.output.AggregateRecordEgress]
error-egress [carly.output.InvalidRecordEgress]
akka-cdr-ingestor - output file: file:/tmp/cloudflow-local-run3031754564524796564/akka-cdr-ingestor-local.log
cdr-ingress [carly.ingestor.CallRecordIngress]
- HTTP port [3000]
split [carly.ingestor.CallRecordSplit]
--------------------------------------------------------------------------------
(3)
------------------------------------ Topics ------------------------------------
[aggregated-call-stats]
[generated-call-records]
[invalid-call-records]
[valid-call-records]
--------------------------------------------------------------------------------
(4)
----------------------------- Local Configuration -----------------------------
No local configuration provided.
--------------------------------------------------------------------------------
(5)
------------------------------------ Output ------------------------------------
Pipeline log output available in folder: /tmp/cloudflow-local-run3031754564524796564
--------------------------------------------------------------------------------
Running call-record-aggregator
To terminate, press [ENTER]
1 | Topology View |
2 | Streamlets |
3 | Topics |
4 | Local Configuration |
5 | Output Information |
We can appreciate five main sections of this info panel: The Topology View, Streamlets, Topics, Local Configuration, and Output Information.
- The Topology View
-
The first part of the output of the
runLocal
command is an ascii art representation of the topology of the application. This is the directed graph of connections between streamlets and topics that shows how the components are connected to one another and how the data flows within the application. - Streamlets
-
The streamlets info panel provides a list of the streamlets instantiated in this application. As we see in the example above, the streamlets are grouped by sub-project (if sub-projects are used). For each group, we also have an output file that aggregates the output of all streamlets in that group.
Internally, each group of streamlets will run in a separate JVM to isolate the dependencies of each sub-project. Below each streamlet name there may be one or more local resources printed, like a volume mount or a TCP port. For example, in the example above,
cdr-ingress
is offering an HTTP endpoint on port3000
. - Topics
-
The name of this panel is self-explaining. It list the topics used by this application. In the
sandbox
, all cloudflow-managed topics are created on an in-memory Kafka instance. If external topics are used in the application, they must be reachable from the local machine. Otherwise, the connection to it will fail. - Local Configuration
-
It’s possible to provide a custom configuration for the streamlets running in local mode. For example, to connect to a local instance of a database, instead of an external (cloud) service. The local configuration is explained below in The local configuration file section.
- Output
-
The Output panel shows the directory where all the output of the running application is made available. You can use your favorite text editor or command-line tools to inspect the output and verify that your application is doing what you expect.
The Running Application
The runLocal
task remains active until the ENTER
key is pressed.
While the application is running, you can interact with it through its open interfaces.
If you included a Streamlet with a server endpoint, it’s HTTP
ports will be available to receive data.
Another interesting way of exercising your application is to include data generating streamlets that simulate and inject the expected data into the running system.
To inspect the output, use command line tools, like tail
, or text editors to consume the output generated in the temporary files that capture the streamlet’s output and logs.
the file:// URLs provided by the runLocal output are clickable in most systems.
|
Streamlet Features in Sandbox
In a Cloudflow application, Streamlets offer several customization options such as configuration requirements, volume mounts, and server ports. The Sandbox offers a local implementation of these options that are meaningful in a local environment.
The local configuration file
Applications running in the Sandbox can specify custom values for the local environment by making use of a local configuration file in HOCON format.
This file is called local.conf by default and is assumed to be available on the classpath, usually under the application/src/main/resources
folder.
A custom local configuration file can be specified in the build.sbt
, using the runLocalConfigFile
key.
For example, in this build.sbt
file, we change the local configuration to myruntime.conf in the root dir of the project
runLocalConfigFile := Some("./myruntime.conf"),
The contents of this file are organized by streamlet name, using the streamlet name that you specified in the blueprint.
file-ingress {
//config-key = value -- values for the file-ingress streamlet
}
rotor-avg-logger {
//config-key = value -- values for the rotor-avg-logger streamlet
}
Note that because this file is in HOCON format, dot-notation is also supported:
file-ingress.<config-key>=value rotor-avg-logger.<config-key>=value
Using configuration values
The Streamlet API lets us declare configuration parameters that can be specified at deployment time. For example, this declaration allows us to provide a custom prefix value as a String:
class ValidMetricLogger extends AkkaStreamlet {
//...
val MsgPrefix = StringConfigParameter("msg-prefix", "Provide a prefix for the log lines", Some("valid-logger"))
//...
Let’s assume that we have declared a metric-logger streamlet in the blueprint of this application. When running in the Sandbox, we can specify a custom value for this configuration parameter in the local configuration file as:
metric-logger {
msg-prefix = “local”
}
Using Volume Mounts
In the Streamlet API, Volume Mounts are declared in a similar way to configuration parameters, using a programmatic description of the Volume Mount that includes its desired mount path. In the following example, we declare a Volume Mount with name source-data-mount, requested to be mounted at /mnt/data, and it requires to have a ReadWriteMany access mode:
private val sourceData = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
override def volumeMounts = Vector(sourceData)
In a Kubernetes deployment, that Volume Mount gets mapped to a Permanent Volume Claim. The requested mount path is replaced by a local path when we use the Sandbox to run an application containing one or more streamlets that declare a Volume Mount. The local path assignment can be configured to point to a specific directory. Otherwise, the mount path will be assigned to a temporary directory, created on the fly.
The override configuration for volume mounts must be specified in the local configuration file that we discussed earlier.
This example shows a configuration for the Volume Mount named source-data-mount, which we declared earlier in this section, and it points to the local directory /tmp/cloudflow
file-ingress {
source-data-mount="/tmp/cloudflow"
}
Note that for this feature to work properly, it’s important to request the assigned mount path from the StreamletContext, instead of relying on a hardcoded value, like this:
// in the streamlet code
// volume mount declaration
private val sourceData = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
override def volumeMounts = Vector(sourceData)
// use
val listFiles: NotUsed ⇒ Source[Path, NotUsed] = { _ ⇒
Directory.ls(getMountedPath(sourceData))
}
In the Streamlet programming, do not assume that the mounted path is the same as the requested path. Do not do this:
// Do not access the mount path directly!
val files = Directory.ls(FileSystems.getDefault().getPath("/mnt/data"))