Building a Flink Streamlet
Integration with the Lyft Flink Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Flink integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Flink Native Kubernetes integration. The documentation that follows describes the deprecated feature. The FlinkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeFlinkPlugin , which is described in the Cloudflow contrib documentation for building Flink native streamlets.
|
The following sections describe how you can create a Flink streamlet. As mentioned in Using Flink streamlets, a Flink streamlet is defined by the following features:
-
It is a
Streamlet
. Cloudflow offers a class for implementing Flink streamlets,FlinkStreamlet
which extendscloudflow.streamlets.Streamlet
. Any Flink streamlet needs to extendFlinkStreamlet
. -
It has a shape - we call it
StreamletShape
. Any Flink streamlet needs to define a concrete shape using the APIs available for theStreamletShape
class, which defines the inlets and outlets of the streamlet. -
It has a
StreamletLogic
that defines how the streamlet generates Flink streaming queries from the business logic.
In this tutorial, we’ll build a simple Flink streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter
.
Extending from FlinkStreamlet
Let’s start with building the ReportPrinter
streamlet.
The first thing to do is extend the cloudflow.flink.FlinkStreamlet
abstract class, as shown below:
- Scala
-
package com.example import cloudflow.flink._ import cloudflow.streamlets.StreamletShape // TODO rename to ReportPrinter class ReportPrinterStep0 extends FlinkStreamlet { // 1. TODO Create inlets and outlets // 2. TODO Define the shape of the streamlet val shape = StreamletShape.empty // 3. TODO Override createLogic to provide StreamletLogic def createLogic(): FlinkStreamletLogic = new FlinkStreamletLogic() { def buildExecutionGraph = () } }
- Java
-
Unresolved include directive in modules/develop/pages/build-flink-streamlets.adoc - include::2.2.2@docsnippets:ROOT:example$build-flink-streamlets-java/step0/src/main/java/com/example/ReportPrinter.java[]
The code snippet above shows a class ReportPrinter
that extends FlinkStreamlet
.
We have identified the steps needed to complete the implementation (shown by the TODO markers). In the
next few sections we will complete the implementation one step at a time.
First let’s implement inlets and outlets of the streamlet.
Inlets and outlets
The streamlet that we are building in this tutorial will have one inlet and no outlet.
- Scala
-
package com.example import cloudflow.streamlets.avro._ import cloudflow.streamlets.StreamletShape import cloudflow.flink._ // TODO rename to ReportPrinter class ReportPrinterStep1 extends FlinkStreamlet { // 1. Create inlets and outlets @transient val in = AvroInlet[Report]("report-in") // 2. TODO Define the shape of the streamlet val shape = StreamletShape.empty // 3. TODO Override createLogic to provide StreamletLogic def createLogic(): FlinkStreamletLogic = new FlinkStreamletLogic() { def buildExecutionGraph = () } }
- Java
-
Unresolved include directive in modules/develop/pages/build-flink-streamlets.adoc - include::2.2.2@docsnippets:ROOT:example$build-flink-streamlets-java/step1/src/main/java/com/example/ReportPrinter.java[]
All inlets, outlets and shape (defined below) need to be annotated with |
Cloudflow supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet
. Report
is the
class of objects that will be accepted by this inlet. Inlets defined by AvroInlet[Report]
will only accept Avro
encoded data for the class Report
. The class Report
will be generated by Cloudflow during application build time from the Avro schema that the
user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier.
As an example we can have the following Avro schema for the Report
object that contains a report of some of the attributes
of products from an inventory:
{
"namespace": "com.example",
"type": "record",
"name": "Report",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "keywords",
"type": {
"type": "array",
"items": "string"
}
}
]
}
In the definition of the inlet, "report-in" is the name of the inlet. The inlet should have a domain-specific name that describes the nature of the data ingested. We will use this inlet later to read data from it.
This streamlet does not have any outlet. But in general, outlets are defined similarly, |
Streamlet Shape
Lets now define the shape of ReportPrinter
by using the APIs in cloudflow.streamlets.StreamletShape
:
- Scala
-
package com.example import cloudflow.streamlets._ import cloudflow.streamlets.avro._ import cloudflow.streamlets.StreamletShape import cloudflow.flink._ // TODO rename to ReportPrinter class ReportPrinterStep2 extends FlinkStreamlet { // 1. Create inlets and outlets @transient val in = AvroInlet[Report]("report-in") // 2. Define the shape of the streamlet @transient val shape = StreamletShape.withInlets(in) // 3. TODO Override createLogic to provide StreamletLogic def createLogic(): FlinkStreamletLogic = new FlinkStreamletLogic() { def buildExecutionGraph = () } }
- Java
-
Unresolved include directive in modules/develop/pages/build-flink-streamlets.adoc - include::2.2.2@docsnippets:ROOT:example$build-flink-streamlets-java/step2/src/main/java/com/example/ReportPrinter.java[]
The above code overrides the shape
method with a value that defines the shape of the streamlet. StreamletShape
offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid)
.
The next step is to define the FlinkStreamletLogic
.
Defining the FlinkStreamletLogic
The FlinkStreamletLogic
class makes it possible for a user to specify domain logic. It is defined as an abstract class in cloudflow.flink.FlinkStreamletLogic
and provides an abstract method buildExecutionGraph
where the user can define the specific logic for the Flink Streamlet.
In this step, we need to override createLogic
from FlinkStreamlet
in our ReportPrinter
class. createLogic
needs to return an instance of FlinkStreamletLogic
which will do the processing based on the requirements of ReportPrinter
object.
- Scala
-
package com.example import cloudflow.streamlets._ import cloudflow.streamlets.avro._ import cloudflow.flink._ import org.apache.flink.streaming.api.scala._ class ReportPrinter extends FlinkStreamlet { // 1. Create inlets and outlets @transient val in = AvroInlet[Report]("report-in") // 2. Define the shape of the streamlet @transient val shape = StreamletShape.withInlets(in) // 3. Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams. override def createLogic() = new FlinkStreamletLogic { def format(report: Report) = s"${report.name}\n\n${report.description}" override def buildExecutionGraph = readStream(in).map(r => format(r)).print() } }
- Java
-
package com.example; import org.apache.flink.streaming.api.datastream.DataStream; import cloudflow.streamlets.StreamletShape; import cloudflow.streamlets.avro.*; import cloudflow.flink.*; public class ReportPrinter extends FlinkStreamlet { // 1. Create inlets and outlets transient AvroInlet<Report> in = AvroInlet.<Report>create("report-in", Report.class); // 2. Define the shape of the streamlet @Override public StreamletShape shape() { return StreamletShape.createWithInlets(in); } // 3. Override createLogic to provide StreamletLogic @Override public FlinkStreamletLogic createLogic() { return new FlinkStreamletLogic(getContext()) { public String format(Report r) { return new StringBuilder() .append(r.getName()) .append("\n\n") .append(r.getDescription()) .toString(); } @Override public void buildExecutionGraph() { DataStream<Report> ins = this.<Report>readStream(in, Report.class); ins .map((Report r) -> format(r)) .print(); } }; } }
In the above code, we override createLogic
from FlinkStreamletLogic
with an instance that overrides buildExecutionGraph
to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in")
, and do some processing on it.
Note that the processing logic can be quite complex and we can maintain local state as part of the implementation of FlinkStreamletLogic
.
If the streamlet needs to have local state ( |
In summary, here are the steps for defining a Flink streamlet:
-
Define the inlets and outlets
-
Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Cloudflow
-
Define the custom processing logic that will read data from inlets and write data to outlets
Using ReportPrinter
in the blueprint
An example of a blueprint using the ReportPrinter
could look like this:
blueprint {
streamlets {
ingress = com.example.ReportIngress
report-printer = com.example.ReportPrinter
}
topics {
reports {
producers = [ingress.out]
consumers = [report-printer.report-in]
}
}
}
The omitted ReportIngress
could for instance be another FlinkStreamlet
that writes Report
s to its outlet.