Building a Flink streamlet

The following sections describe how you can create a Flink streamlet. As mentioned in Using Flink streamlets, a Flink streamlet is defined by the following features:

  • It is a Streamlet. Cloudflow offers a class for implementing Flink streamlets, FlinkStreamlet which extends cloudflow.streamlets.Streamlet. Any Flink streamlet needs to extend FlinkStreamlet.

  • It has a shape - we call it StreamletShape. Any Flink streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines how the streamlet generates Flink streaming queries from the business logic.

In this tutorial, we’ll build a simple Flink streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

Extending from FlinkStreamlet

Let’s start with building the ReportPrinter streamlet. The first thing to do is extend the cloudflow.flink.FlinkStreamlet abstract class, as shown below:

Scala
package com.example

import cloudflow.flink._

class ReportPrinter extends FlinkStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  val shape = ???
  // 3. TODO Override createLogic to provide StreamletLogic
  def createLogic(): FlinkStreamletLogic = ???
}
Java
package com.example;

import cloudflow.streamlets.StreamletShape;
import cloudflow.flink.*;

public class ReportPrinter extends FlinkStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); }
  // 3. TODO Override createLogic to provide StreamletLogic
  public FlinkStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

The code snippet above shows a class ReportPrinter that extends FlinkStreamlet. We have identified the steps needed to complete the implementation (shown by the TODO markers). In the next few sections we will complete the implementation one step at a time.

First let’s implement inlets and outlets of the streamlet.

Inlets and outlets

The streamlet that we are building in this tutorial will have one inlet and no outlet.

Scala
package com.example

import cloudflow.streamlets.avro._
import cloudflow.flink._

class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  @transient val in = AvroInlet[Report]("report-in")

  // 2. TODO Define the shape of the streamlet
  val shape = ???
  // 3. TODO Override createLogic to provide StreamletLogic
  def createLogic(): FlinkStreamletLogic = ???
}
Java
package com.example;

import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.*;
import cloudflow.flink.*;

public class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  transient AvroInlet<Report> in = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() { throw new UnsupportedOperationException("Not Implemented"); }
  // 3. TODO Override createLogic to provide StreamletLogic
  public FlinkStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

All inlets, outlets and shape (defined below) need to be annotated with @transient in order to prevent serialization. These structures contain non-serializable objects like an Avro Schema which cannot be serialized.

Cloudflow supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. Inlets defined by AvroInlet[Report] will only accept Avro encoded data for the class Report. The class Report will be generated by Cloudflow during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following Avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. The inlet should have a domain-specific name that describes the nature of the data ingested. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general, outlets are defined similarly, @transient val out = AvroOutlet[Report]("report-out", _.name) will define an outlet that will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and _.name is the partitioning function that partitions the data from the outlet.

Streamlet Shape

Lets now define the shape of ReportPrinter by using the APIs in cloudflow.streamlets.StreamletShape:

Scala
package com.example

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.flink._

class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  @transient val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  @transient val shape = StreamletShape.withInlets(in)

  // 3. TODO Override createLogic to provide StreamletLogic
  def createLogic(): FlinkStreamletLogic = ???
}
Java
package com.example;

import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.*;
import cloudflow.flink.*;

public class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  transient AvroInlet<Report> in = AvroInlet.<Report>create("report-in", Report.class);

  // 2. Define the shape of the streamlet
  @Override public StreamletShape shape() {
    return StreamletShape.createWithInlets(in);
  }

  // 3. TODO Override createLogic to provide StreamletLogic
  public FlinkStreamletLogic createLogic() { throw new UnsupportedOperationException("Not Implemented"); }
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the FlinkStreamletLogic.

Defining the FlinkStreamletLogic

The FlinkStreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in cloudflow.flink.FlinkStreamletLogic and provides an abstract method buildExecutionGraph where the user can define the specific logic for the Flink Streamlet.

In this step, we need to override createLogic from FlinkStreamlet in our ReportPrinter class. createLogic needs to return an instance of FlinkStreamletLogic which will do the processing based on the requirements of ReportPrinter object.

Scala
package com.example

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.flink._

import org.apache.flink.streaming.api.scala._

class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  @transient val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet
  @transient val shape = StreamletShape.withInlets(in)

  // 3. Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
  override def createLogic() = new FlinkStreamletLogic {
    def format(report: Report) = s"${report.name}\n\n${report.description}"

    override def buildExecutionGraph =
      readStream(in).map(r => format(r)).print()

  }
}
Java
package com.example;

import org.apache.flink.streaming.api.datastream.DataStream;

import cloudflow.streamlets.StreamletShape;
import cloudflow.streamlets.avro.*;
import cloudflow.flink.*;

public class ReportPrinter extends FlinkStreamlet {
  // 1. Create inlets and outlets
  transient AvroInlet<Report> in = AvroInlet.<Report>create("report-in", Report.class);

  // 2. Define the shape of the streamlet
  @Override public StreamletShape shape() {
    return StreamletShape.createWithInlets(in);
  }

  // 3. Override createLogic to provide StreamletLogic
  @Override public FlinkStreamletLogic createLogic() {
    return new FlinkStreamletLogic(getContext()) {
      public String format(Report r) {
        return new StringBuilder()
	  .append(r.getName())
	  .append("\n\n")
	  .append(r.getDescription())
	  .toString();
      }

      @Override public void buildExecutionGraph() {

        DataStream<Report> ins =
          this.<Report>readStream(in, Report.class);
        ins
          .map((Report r) -> format(r))
          .print();
      }
    };
  }
}

In the above code, we override createLogic from FlinkStreamletLogic with an instance that overrides buildExecutionGraph to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in"), and do some processing on it.

Note that the processing logic can be quite complex and we can maintain local state as part of the implementation of FlinkStreamletLogic.

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the FlinkStreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Cloudflow for extraction of streamlets using reflection and hence cannot have any state within it.

In summary, here are the steps for defining a Flink streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Cloudflow

  • Define the custom processing logic that will read data from inlets and write data to outlets

Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  topics {
    reports {
      producers = [ingress.out]
      consumers = [report-printer.report-in]
    }
  }
}

The omitted ReportIngress could for instance be another FlinkStreamlet that writes Reports to its outlet.