Using Spark Streamlets
A Spark streamlet has the following responsibilities:
It needs to capture your stream processing logic.
It needs to publish metadata which will be used by the
sbt-cloudflowplugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (
StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets need to match on inlets and outlets to make a valid cloudflow topology, as mentioned previously in Composing applications using blueprints. Cloudflow automatically extracts the shapes from the streamlets to verify this match.
For the Cloudflow runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.
The inlets and outlets of a
Streamlethave two functions:
To specify to Cloudflow that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and
To provide handles inside the stream processing logic to connect to the data streams that Cloudflow provides. The
StreamletLogicprovides methods that take an inlet or outlet argument to read from or write to. These will be the specific data streams that Cloudflow has set up for you.
The next sections will go into the details of defining a Spark streamlet:
defining inlets and outlets
creating a streamlet shape from the inlets and outlets
creating a streamlet logic that uses the inlets and outlets to read and write data
A streamlet can have one or more inlets and outlets.
Cloudflow offers classes for
Outlets based on the
codec of the data they manipulate.
Currently, Cloudflow supports Avro and hence the classes are named
Each outlet also allows the user to define a partitioning function that will be used to partition the data.
Currently Cloudflow supports two different data encoding types - Avro and Protobuf. Both can be used for Spark streamlet,
but you should remember several rules. Protobuf support is based on Scala-PB Spark,
which comes with its own
scalapb.spark.Implicits. which is not compatible with
When using these
implicits the following rules have to be observed:
If protobuf’s encoding is not used, always use
If you are using at least one protobuf encoding, you need to use
scalapb.spark.Implicits._. Note that this
implicitsdo not include default implicit supporting Scala classes. So if in addition to protobuf, you are using Avro and or any custom scala types in your Spark processing, you have to add implicit encoders, for these types. For example, if you are using the following type in your calculations:
case class Rate(timestamp: Timestamp, value: Long)
In your code you should have a definition of the corresponding encoder:
implicit val rateEncoder = Encoders.product[Rate]
StreamletShape captures the connectivity and compatibility details of a Spark-based streamlet.
It captures which—and how many—inlets and outlets the streamlet has.
sbt-cloudflow plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath.
This metadata is used to verify that the blueprint connects the streamlets correctly.
Cloudflow offers an API
StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names.
The outlet also allows for the definition of a partitioning function.
This partitioning function is used to distribute the output data among partitions.
When you build your own Spark streamlet, you need to define the shape. You will learn how to do this in Building a Spark streamlet. The next section describes how the
SparkStreamletLogic captures stream processing logic.
The stream processing logic is captured in a
SparkStreamletLogic, which is an abstract class.
SparkStreamletLogic provides methods to read and write data streams, and provides access to the configuration of the streamlet, among other things.
SparkStreamletLogic must setup one or more Structured Streaming Queries, represented by a collection of
StreamingQuerys, through the method
These jobs will be run by the
run method of
SparkStreamlet to produce a
StreamletExecution is a simple class to manage a collection of
SparkStreamletLogic is only constructed when the streamlet is run, so it is safe to put instance values and variables in it that you would like to use when the streamlet is running. Note that the
Streamlet is created for extracting metadata and hence no instance values should be put inside a streamlet.
SparkStreamletContext provides the necessary context under which a streamlet runs.
It contains the following context data and contracts:
SparkSessionto run Spark streaming jobs.
Configloaded from the classpath through a
configmethod, which can be used to read configuration settings.
The name used in the blueprint for the specific instance of this streamlet being run.
checkpointDirwhich returns a directory on a persistent storage where checkpoints can be safely kept, making them available across restarts.
Spark Structured streaming provides a mechanism for controlling the timing of streaming data processing in the form of triggers. The trigger settings of a streaming query define whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query. Different kinds of triggers that are supported by Spark are here.
writeStream method in stream logic allows to specify trigger used for Streamlet execution.
In general terms, when you publish and deploy a Cloudflow application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Spark streamlets, each streamlet is submitted as a Spark application through the Spark Operator as a Spark Application resource. Upon deployment, it becomes a set of running pods, with one pod as a Spark Driver and n-pods as executors, where n is the scale factor for that streamlet.
The Spark Operator is a Kubernetes operator dedicated to managing Spark applications.
The Spark Operator takes care of monitoring the streamlets and is responsible for their resilient execution, like restarting the Spark application in case of failure.
In the Spark streamlet deployment model shown above, we can visualize the chain of delegation involved in the deployment of a Spark streamlet:
The Cloudflow Operator prepares a Custom Resource describing the Spark streamlet and submits it to the Spark Operator.
The Spark Operator processes the Custom Resource and issues the deployment of a Spark driver pod.
The Spark Driver then requests executor resources from Kubernetes to deploy the distributed processing.
Finally, if and when resources are available, the Spark-bound executors start as Kubernetes pods. The executors are the components tasked with the actual data processing, while the Spark driver serves as coordinator of the (stream) data process.
In this architecture, the Spark driver runs the Cloudflow-specific logic that connects the streamlet to our managed data streams, at which point the streamlet starts consuming from inlets. The streamlet advances through the data streams that are provided on inlets and writes data to outlets.
If you make any changes to the streamlet and deploy the application again, the existing Spark applications will be stopped, and the new version will be started to reflect the changes. It could be that Spark streamlets are restarted in other ways, for instance by administrators.
This means that a streamlet can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.
The message delivery semantics provided by Spark streamlets are determined by the guarantees provided by the underlying Spark sources and sinks used in the streamlet. Recall that we defined the different message delivery guarantees in Message delivery semantics.
Let’s consider the following types of streamlets as forming a topology as illustrated in Ingress-Processor-Egress Streamlets:
an ingress, a streamlet that reads from an external streaming source and makes data available to the pipeline
a processor, a streamlet that has an inlet and an outlet - it does domain logic processing on data that it reads and passes the processed data downstream
an egress, a streamlet that receives data on its inlet and writes to an external sink
The following sections will use these types of streamlets for describing message delivery semantics.
Spark-based ingresses use a Structured Streaming source to obtain data from an external streaming source and provide it to the Cloudflow application. In this case, message delivery semantics are dependent on the capabilities of the source. In general, streaming sources deemed resilient will provide at-least-once semantics in a Pipeline application.
A Spark based processor is a streamlet that receives and produces data internal to the Cloudflow application. In this scenario, processors consume data from inlets and produce data to outlets using Kafka topics as the underlying physical implementation.
At the level of a Spark-based processor, this translates to using Kafka sources and sinks. Given that Kafka is a reliable data source from the Structured Streaming perspective, the message delivery semantics are considered to be at-least-once.
Testing At-Least-Once Message Delivery
Bundled with our examples,
The message delivery guarantees of a Spark-based egress are determined by the combination of the Kafka-based inlet and the target system where the egress will produce its data.
The data provided to the egress is delivered with at-least-once semantics. The egress is responsible to reliably produce this data to the target system. The overall message delivery semantics of the Pipeline will depend on the reliability characteristics of this egress.
In particular, it is possible to 'upgrade' the end-to-end message delivery guarantee to effectively-exactly-once by making the egress idempotent and ensuring that all other streamlets used provide at-least-once semantics.
For this purpose, we could use Structured Streaming’s deduplication feature, or use a target system able to preserve uniqueness of primary keys, such as an RDBMS.
The at-least-once delivery is guaranteed within the Kafka retention configuration. This retention is a configuration proper to the Kafka brokers that dictates when old data can be evicted from the log. If a Spark-based processor is offline for a longer time than the configured retention, it will restart from the earliest offset available in the corresponding Kafka topic, which might silently result in data loss.
In a Pipeline application, it’s possible to use the full capabilities of Structured Streaming to implement the business logic required in a streamlet.
This includes stateful processing, from time-based aggregations to arbitrary stateful computations that use
All stateful processing relies on snapshots and a state store for the bookkeeping of the offsets processed and the computed state at any time.
In Cloudflow, this state store is deployed on Kubernetes using Persistent Volume Claims (PVCs) backed by a storage class that allows for access mode
To deploy a Spark application it is necessary that a PVC already exists in the namespace for the application. Cloudflow will automatically mount this PVC in the streamlets themselves to allow state storage, as long as it has a
/mnt/spark/storage mount path. Cloudflow will look for a PVC named
cloudflow-spark by default that mounts this path. It is also possible to create a differently named PVC, that mounts
/mnt/spark/storage and mount it using a configuration file, as described in The Configuration Model. The access mode for the PVC has to be
We can use the managed storage to safely store checkpoint data.
Checkpoint information is managed by Cloudflow for all streamlets that use the
In case we need a directory to store state data that must persist across restarts of the streamlet, we can obtain a directory mounted on the managed PVC using the
This method takes as parameter the
name of the directory we want for our particular use and returns a path to a persistent storage location.
When implementing egresses that use Spark’s Structured Streaming sinks, ensure that each query uses a unique
name for its
In Example of the use of
checkpointDir, we can see the use of the
checkpointDir method to provide the checkpoint directory to a console-based egress.
override def buildStreamingQueries = readStream(in).writeStream .format("console") .option("checkpointLocation", context.checkpointDir("console-egress")) .outputMode(OutputMode.Append()) .start() .toQueryExecution
Storage Size is Currently Fixed
Please note that — currently — the volume size allocated for storage is fixed per Cloudflow platform deployment.
The storage of snapshots and state has certain impact on the upgradability of the Spark-based components. Once a state representation is deployed, the state definition (schema) may not change.
Upgrading an application that uses stateful computation requires planning ahead of time to avoid making incompatible changes that prevent recovery using the saved state. Significant changes to the application logic, addition of sources, or changes to the schema representing the state are not allowed.
For details on the degrees of freedom and upgrade options, please refer to the Structured Streaming documentation.
If you enable plugin
CloudflowSparkPlugin, Cloudflow will add all of the Spark jars, necessary for a typical Spark streamlet implementation. But sometimes
you might need to add additional libraries. When doing so, it is necessary to make sure, that the versions
of the libraries that you are adding have the same version that the rest of of the Spark libraries.
The version used by Cloudflow are currently defined in the variable
So, if for example, you want to add local Spark ML library, you can do it as shown below:
libraryDependencies ++= Seq("org.apache.spark"%%"spark-mllib-local"%cloudflow.sbt.CloudflowSparkPlugin.SparkVersion)