Building a Spark Streamlet
Integration with the Spark Operator has been deprecated since 2.2.0, and will be removed in version 2.3.x. Spark integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Spark Native Kubernetes integration. The documentation that follows describes the deprecated feature. The SparkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the
The following sections describe how you can create a Spark streamlet. As mentioned in Using Spark streamlets, a Spark streamlet is defined by the following features:
It is a
Streamlet. Cloudflow offers a class for implementing Spark streamlets,
cloudflow.streamlets.Streamlet. Any Spark streamlet needs to extend
It has a shape - we call it
StreamletShape. Any Spark streamlet needs to define a concrete shape using the APIs available for the
StreamletShapeclass, which defines the inlets and outlets of the streamlet.
It has a
StreamletLogicthat defines how the streamlet generates
StreamingQuerys from the business logic.
In this tutorial we’ll build a simple Spark streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet
Lets start with building the
The first thing to do is extend the
cloudflow.spark.SparkStreamlet abstract class, as shown below:
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/step0/src/main/scala/com/example/ReportPrinter.scala
The code snippet above shows an abstract class
SparkConsoleEgress that extends
We have shown the steps needed to complete the implementation, which we will do in the next few sections.
The next step is to implement inlets and outlets of the streamlet.
The streamlet that we are building in this tutorial will have an inlet and no outlet.
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/step1/src/main/scala/com/example/ReportPrinter.scala
Cloudflow supports Avro encoded processing of data - we make this explicit by defining the inlet as
Report is the
class of objects that will be accepted by this inlet. This means that an inlet defined by
AvroInlet[Report] will only accept Avro
encoded data for the class
Report. The class
Report will be generated by Cloudflow during application build time from the Avro schema that the
user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier.
As an example we can have the following Avro schema for the
Report object that contains a report of some of the attributes
of products from an inventory:
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/step1/src/main/avro/Report.avsc
In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.
This streamlet does not have any outlet. But in general outlets are defined similarly,
val out = AvroOutlet[Report]("report-out", _.name) will
define an outlet which will write Avro encoded data for the object of type
Report. Here "report-out" is the name of the outlet and
_.name is the partitioning function that partitions the data from the outlet.
Lets now define the shape of
ReportPrinter by using the APIs in
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/step2/src/main/scala/com/example/ReportPrinter.scala
The above code overrides the
shape method with a value that defines the shape of the streamlet.
StreamletShape offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write
StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).
The next step is to define the
SparkStreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in
cloudflow.spark.SparkStreamletLogic and provides an abstract method
buildStreamingJobs where the user can define the specific logic for the Spark Streamlet.
In this step we need to override
SparkStreamlet in our
createLogic needs to return an instance of
SparkStreamletLogic which will do the processing based on the requirements of
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/step3/src/main/scala/com/example/ReportPrinter.scala
In the above code we override
SparkStreamletLogic with an instance that overrides
buildStreamingQueries to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier,
val in = AvroInlet[Report]("report-in"), and do some processing on it.
Here are the steps that we do as part of the processing logic:
Since it’s a console printer, we would like to write to console as specified by
.format("console")in the implementation above.
We use two parameters on how to display (a) how many rows to display at once and (b) if we would like to truncate long lines. These are defined by values
truncatein the concrete implementation of
Note that the processing logic can be quite complex and we can maintain state as part of the implementation of
If the streamlet needs to have local state (
In summary, here are the steps for defining a Spark streamlet:
Define the inlets and outlets.
Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Cloudflow.
Define the custom processing logic that will read data from inlets and write data to outlets.
An example of a blueprint using the
ReportPrinter could look like this:
Unresolved include directive in modules/develop/pages/build-spark-streamlets.adoc - include::2.3.0@docsnippets:ROOT:example$build-spark-streamlets-scala/app/src/main/blueprint/blueprint.conf
ReportIngress could for instance be another
SparkStreamlet that writes
Reports to its outlet.