Building a Spark Streamlet
Integration with the Spark Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Spark integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Spark Native Kubernetes integration. The documentation that follows describes the deprecated feature. The SparkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeSparkPlugin , which is described in the Cloudflow contrib documentation for building Spark native streamlets.
|
The following sections describe how you can create a Spark streamlet. As mentioned in Using Spark streamlets, a Spark streamlet is defined by the following features:
-
It is a
Streamlet
. Cloudflow offers a class for implementing Spark streamlets,SparkStreamlet
which extendscloudflow.streamlets.Streamlet
. Any Spark streamlet needs to extendSparkStreamlet
. -
It has a shape - we call it
StreamletShape
. Any Spark streamlet needs to define a concrete shape using the APIs available for theStreamletShape
class, which defines the inlets and outlets of the streamlet. -
It has a
StreamletLogic
that defines how the streamlet generatesStreamingQuery
s from the business logic.
In this tutorial we’ll build a simple Spark streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter
.
Extending from SparkStreamlet
Lets start with building the ReportPrinter
streamlet.
The first thing to do is extend the cloudflow.spark.SparkStreamlet
abstract class, as shown below:
package com.example
import cloudflow.spark._
object ReportPrinter extends SparkStreamlet {
// 1. TODO Create inlets and outlets
// 2. TODO Define the shape of the streamlet
override val shape = ???
// 3. TODO Override createLogic to provide StreamletLogic
override def createLogic = ???
}
The code snippet above shows an abstract class SparkConsoleEgress
that extends SparkStreamlet
.
We have shown the steps needed to complete the implementation, which we will do in the next few sections.
The next step is to implement inlets and outlets of the streamlet.
Inlets and outlets
The streamlet that we are building in this tutorial will have an inlet and no outlet.
package com.example
import cloudflow.streamlets.avro._
import cloudflow.spark._
object ReportPrinter extends SparkStreamlet {
// 1. Create inlets and outlets
val in = AvroInlet[Report]("report-in")
// 2. TODO Define the shape of the streamlet
override val shape = ???
// 3. TODO Override createLogic to provide StreamletLogic
override def createLogic = ???
}
Cloudflow supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet
. Report
is the
class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report]
will only accept Avro
encoded data for the class Report
. The class Report
will be generated by Cloudflow during application build time from the Avro schema that the
user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier.
As an example we can have the following Avro schema for the Report
object that contains a report of some of the attributes
of products from an inventory:
{
"namespace": "com.example",
"type": "record",
"name": "Report",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "keywords",
"type": {
"type": "array",
"items": "string"
}
}
]
}
In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.
This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", _.name)
will
define an outlet which will write Avro encoded data for the object of type Report
. Here "report-out" is the name of the outlet and _.name
is the partitioning function that partitions the data from the outlet.
Streamlet shape
Lets now define the shape of ReportPrinter
by using the APIs in cloudflow.streamlets.StreamletShape
:
package com.example
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.spark._
object ReportPrinter extends SparkStreamlet {
// 1. Create inlets and outlets
val in = AvroInlet[Report]("report-in")
// 2. Define the shape of the streamlet
override val shape = StreamletShape.withInlets(in)
// 3. TODO Override createLogic to provide StreamletLogic
override def createLogic = ???
}
The above code overrides the shape
method with a value that defines the shape of the streamlet. StreamletShape
offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid)
.
The next step is to define the SparkStreamletLogic
.
Defining the SparkStreamletLogic
The SparkStreamletLogic
class makes it possible for a user to specify domain logic. It is defined as an abstract class in cloudflow.spark.SparkStreamletLogic
and provides an abstract method buildStreamingJobs
where the user can define the specific logic for the Spark Streamlet.
In this step we need to override createLogic
from SparkStreamlet
in our ReportPrinter
object. createLogic
needs to return an instance of SparkStreamletLogic
which will do the processing based on the requirements of ReportPrinter
object.
package com.example
import org.apache.spark.sql.streaming._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.spark._
import cloudflow.spark.sql.SQLImplicits._
object ReportPrinter extends SparkStreamlet {
// 1. Create inlets and outlets
val in = AvroInlet[Report]("report-in")
// 2. Define the shape of the streamlet
override val shape = StreamletShape.withInlets(in)
// 3. Override createLogic to provide StreamletLogic
override def createLogic = new SparkStreamletLogic {
// Define some formatting attributes
val numRows = 50
val truncate = false
override def buildStreamingQueries = {
val inDataset = readStream(in)
val query = inDataset.writeStream
.format("console")
.option("numRows", numRows)
.option("truncate", truncate)
.outputMode(OutputMode.Append())
.start()
query.toQueryExecution
}
}
}
In the above code we override createLogic
from SparkStreamletLogic
with an instance that overrides buildStreamingQueries
to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in")
, and do some processing on it.
Here are the steps that we do as part of the processing logic:
-
Since it’s a console printer, we would like to write to console as specified by
.format("console")
in the implementation above. -
We use two parameters on how to display (a) how many rows to display at once and (b) if we would like to truncate long lines. These are defined by values
numRows
andtruncate
in the concrete implementation ofSparkStreamletLogic
.
Note that the processing logic can be quite complex and we can maintain state as part of the implementation of SparkStreamletLogic
.
If the streamlet needs to have local state ( |
In summary, here are the steps for defining a Spark streamlet:
-
Define the inlets and outlets.
-
Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Cloudflow.
-
Define the custom processing logic that will read data from inlets and write data to outlets.
Using ReportPrinter
in the blueprint
An example of a blueprint using the ReportPrinter
could look like this:
blueprint {
streamlets {
ingress = com.example.ReportIngress
report-printer = com.example.ReportPrinter
}
topics {
reports {
producers = [ingress.out]
consumers = [report-printer.report-in]
}
}
}
The omitted ReportIngress
could for instance be another SparkStreamlet
that writes Report
s to its outlet.