Get monitoring, observability, online education,
and expert support from Lightbend.
Learn More

Building a Spark Streamlet

The following sections describe how you can create a Spark streamlet. As mentioned in Using Spark streamlets, a Spark streamlet is defined by the following features:

  • It is a Streamlet. Cloudflow offers a class for implementing Spark streamlets, SparkStreamlet which extends cloudflow.streamlets.Streamlet. Any Spark streamlet needs to extend SparkStreamlet.

  • It has a shape - we call it StreamletShape. Any Spark streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines how the streamlet generates StreamingQuerys from the business logic.

In this tutorial we’ll build a simple Spark streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

Extending from SparkStreamlet

Lets start with building the ReportPrinter streamlet. The first thing to do is extend the cloudflow.spark.SparkStreamlet abstract class, as shown below:

package com.example

import cloudflow.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  override val shape = ???
  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic = ???
}

The code snippet above shows an abstract class SparkConsoleEgress that extends SparkStreamlet. We have shown the steps needed to complete the implementation, which we will do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

Inlets and outlets

The streamlet that we are building in this tutorial will have an inlet and no outlet.

package com.example

import cloudflow.streamlets.avro._
import cloudflow.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. TODO Define the shape of the streamlet
  override val shape = ???
  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic = ???
}

Cloudflow supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept Avro encoded data for the class Report. The class Report will be generated by Cloudflow during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following Avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", _.name) will define an outlet which will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and _.name is the partitioning function that partitions the data from the outlet.

Streamlet shape

Lets now define the shape of ReportPrinter by using the APIs in cloudflow.streamlets.StreamletShape:

package com.example

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  override val shape = StreamletShape.withInlets(in)

  // 3. TODO Override createLogic to provide StreamletLogic
  override def createLogic = ???
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the SparkStreamletLogic.

Defining the SparkStreamletLogic

The SparkStreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in cloudflow.spark.SparkStreamletLogic and provides an abstract method buildStreamingJobs where the user can define the specific logic for the Spark Streamlet.

In this step we need to override createLogic from SparkStreamlet in our ReportPrinter object. createLogic needs to return an instance of SparkStreamletLogic which will do the processing based on the requirements of ReportPrinter object.

package com.example

import org.apache.spark.sql.streaming._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.spark._

import cloudflow.spark.sql.SQLImplicits._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  override val shape = StreamletShape.withInlets(in)

  // 3. Override createLogic to provide StreamletLogic
  override def createLogic = new SparkStreamletLogic {
    // Define some formatting attributes
    val numRows  = 50
    val truncate = false

    override def buildStreamingQueries = {
      val inDataset = readStream(in)
      val query = inDataset.writeStream
        .format("console")
        .option("numRows", numRows)
        .option("truncate", truncate)
        .outputMode(OutputMode.Append())
        .start()
      query.toQueryExecution
    }
  }
}

In the above code we override createLogic from SparkStreamletLogic with an instance that overrides buildStreamingQueries to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in"), and do some processing on it.

Here are the steps that we do as part of the processing logic:

  • Since it’s a console printer, we would like to write to console as specified by .format("console") in the implementation above.

  • We use two parameters on how to display (a) how many rows to display at once and (b) if we would like to truncate long lines. These are defined by values numRows and truncate in the concrete implementation of SparkStreamletLogic.

Note that the processing logic can be quite complex and we can maintain state as part of the implementation of SparkStreamletLogic.

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the SparkStreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Cloudflow for extraction of metadata. Cloudflow instantiates Streamlets when the blueprint is verified, which can have an unwanted side effects.

In summary, here are the steps for defining a Spark streamlet:

  • Define the inlets and outlets.

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Cloudflow.

  • Define the custom processing logic that will read data from inlets and write data to outlets.

Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  topics {
    reports {
      producers = [ingress.out]
      consumers = [report-printer.report-in]
    }
  }
}

The omitted ReportIngress could for instance be another SparkStreamlet that writes Reports to its outlet.