Using Flink Streamlets
Integration with the Lyft Flink Operator has been deprecated since 2.2.0, and will be removed in version 3.x. Flink integration has moved to the Cloudflow-contrib project. Please see the Cloudflow-contrib getting started guide for instructions on how to use Flink Native Kubernetes integration. The documentation that follows describes the deprecated feature. The FlinkStreamlet API has not changed in cloudflow-contrib, though you do need to use a different dependency and add the CloudflowNativeFlinkPlugin , which is described in the Cloudflow contrib documentation for building Flink native streamlets.
|
A Flink streamlet has the following responsibilities:
-
It needs to capture your stream processing logic.
-
It needs to publish metadata used by the
sbt-cloudflow
plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape
) defined by the inlets and outlets of the streamlet. Connecting streamlets need to match on inlets and outlets to make a valid cloudflow topology, as mentioned previously in Composing applications using blueprints. Cloudflow automatically extracts the shapes from the streamlets to verify this match. -
For the Cloudflow runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.
-
The inlets and outlets of a
Streamlet
have two functions:-
To specify to Cloudflow that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and
-
To provide handles inside the stream processing logic to connect to the data streams that Cloudflow provides. The
StreamletLogic
provides methods that take an inlet or outlet argument to read from or write to. These will be the specific data streams that Cloudflow has set up for you.
-
The next sections will go into the details of defining a Flink streamlet:
-
defining inlets and outlets
-
creating a streamlet shape from the inlets and outlets
-
creating a streamlet logic that uses the inlets and outlets to read and write data
Inlets and outlets
A streamlet can have one or more inlets and outlets.
Cloudflow offers classes for Inlet
s and Outlet
s based on the
codec of the data they manipulate.
Currently, Cloudflow supports Avro, and therefore, the classes are named AvroInlet
and AvroOutlet
.
Each outlet also allows the user to define a partitioning function that will be used to partition the data.
StreamletShape
The StreamletShape
captures the connectivity and compatibility details of a Flink-based streamlet.
It captures which—and how many—inlets and outlets the streamlet has.
The sbt-cloudflow
plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath.
This metadata is used to verify that the blueprint connects the streamlets correctly.
Cloudflow offers an API StreamletShape
to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names.
The outlet also allows for the definition of a partitioning function.
This partitioning function is used to distribute the output data among partitions.
When you build your own Flink streamlet, you need to define the shape.
You will learn how to do this in Building a Flink streamlet.
The next section describes how the FlinkStreamletLogic
captures stream processing logic.
FlinkStreamletLogic
The stream processing logic is captured in the abstract class FlinkStreamletLogic
.
The FlinkStreamletLogic
provides methods to read and write data streams, and provides access to the configuration of the streamlet, among other things.
A FlinkStreamletLogic
must provide a method for executing streaming queries that process the Flink computation graph. The method buildExecutionGraph
has to be overridden by implementation classes that process one or more DataStream
s. The resulting graph is then submitted by executeStreamingQueries
to the Flink runtime StreamExecutionEnvironment
to generate the final output.
These jobs will be run by the run
method of FlinkStreamlet
to produce a StreamletExecution
class, the StreamletExecution
class manages the execution of a Flink Job.
The FlinkStreamletLogic
may contain instance values since it’s only constructed in runtime. The Streamlet
, however, is also instantiated during compile-time, to extract metadata, and must not contain any instance values.
FlinkStreamletContext
The FlinkStreamletContext
provides the necessary context under which a streamlet runs.
It contains the following context data and contracts:
-
An active
StreamExecutionEnvironment
that will be used to submit streaming jobs to the Flink runtime. -
The Typesafe
Config
loaded from the classpath through aconfig
method, which can be used to read configuration settings. -
The name used in the blueprint for the specific instance of this streamlet being run.
-
A mapping that gives the name of the Kafka topic from the port name.
Lifecycle
In general terms, when you publish and deploy a Cloudflow application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Flink streamlets, each streamlet is submitted as a Flink application through the Flink Operator as a Flink Application resource. Upon deployment, it becomes a set of running pods, with one pod as a Flink Job Manager and n-pods as Task Managers, where n is the scale factor for that streamlet.
The Flink Operator is a Kubernetes operator dedicated to managing Flink applications. |
The Flink Operator takes care of monitoring the streamlets and is responsible for their resilient execution, like restarting the Flink application in case of failure.
In Flink Streamlet Deployment Model, we can visualize the chain of delegation involved in the deployment of a Flink streamlet:
-
The Cloudflow Operator prepares a Custom Resource describing the Flink streamlet and submits it to the Flink Operator
-
The Flink Operator processes the Custom Resource and issues the deployment of a Flink job manager pod.
-
The Flink Job Manager then requests task manager resources from Kubernetes to deploy the distributed processing.
-
Finally, if and when resources are available, the Flink-bound task managers start as Kubernetes pods. The task managers are the components tasked with the actual data processing, while the Job Manager serves as coordinator of the (stream) data process.
In case you are using the cloudflow-contrib model of integration, you need to go through some additional steps to complete the deployment of your Flink streamlets. This section on cloudflow-contrib has more details.
If you make any changes to the streamlet and deploy the application again, the existing Flink application will be stopped, and the new version will be started to reflect the changes. It could be that Flink streamlets are restarted in other ways, for instance, by administrators.
This means that a streamlet can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.
Stream processing semantics
The message processing semantics provided by Flink streamlets are determined by the guarantees provided by the underlying sources, sinks and connectors. Flink offers at-least-once or exactly_once semantics depending on whether checkpointing is enabled.
If checkpointing is enabled, Flink guarantees end to end exactly-once processing with sources, sinks and connectors.
Checkpointing is enabled by default in Cloudflow, using exactly_once
checkpointing mode. You can override this using the Configuration system by specific execution.checkpointing
settings, as shown here.
When we say “exactly-once semantics”, what we mean is that each incoming event affects the final results exactly once. Even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed. This post explains in detail how TwoPhaseCommitSinkFunction
implements the two-phase commit protocol and makes it possible to build end-to-end exactly-once applications with Flink and a selection of data sources and sinks, including Apache Kafka versions 0.11 and beyond.
For more details of how the various types of processing semantics impact Kafka producers and consumers when interacting with Flink, please refer to this guide.
In Cloudflow, streamlets communicate with each other through Kafka - hence the semantics here depends on the settings that we use in our streamlets implementation. For Flink we use at-least-once in FlinkKafkaProducer
. To use exactly-once the Kafka broker needs to have the value of transaction.max.timeout.ms
set to at least 1 hour, which can have an impact on other runtimes in the application.
Reliable restart of stateful processes
A Pipeline application can be stateful and use the full capabilities of keyed state and operator state in Flink. Stateful operators can be used within a Flink streamlet. All states are stored in state stores deployed on Kubernetes using Persistent Volume Claims (PVCs) backed by a storage class that allows for access mode ReadWriteMany
.
To deploy a Flink application it is necessary that a PVC already exists in the namespace for the application. Cloudflow will automatically mount this PVC in the streamlets themselves to allow state storage, as long as it has a /mnt/flink/storage
mount path. Cloudflow will look for a PVC named cloudflow-flink
by default that mounts this path. It is also possible to create a differently named PVC, that mounts /mnt/flink/storage
and mount it using a configuration file, as described in The Configuration Model. The access mode for the PVC has to be ReadWriteMany
.
Flink’s runtime encodes all state and writes them into checkpoints. And since checkpoints in Flink offer an exactly-once guarantee of semantics, all application state are preserved safely throughout the lifetime of the streamlet. In case of failures, recovery will be done from checkpoints, and there will be no data loss.
Adding additional Flink libraries
If you enable plugin CloudflowFlinkPlugin
, Cloudflow will add all of the Flink jars, necessary for a typical Flink streamlet implementation. But sometimes
you might need to add additional libraries. When doing so, it is necessary to make sure, that the versions
of the libraries that you are adding have the same version that the rest of of the Flink libraries.
The version used by Cloudflow are currently defined in the variable cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion
.
So, if for example, you want to add Azure file system, you can do it as shown below:
libraryDependencies ++= Seq("org.apache.flink"%"flink-azure-fs-hadoop"%cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion)
Flink local execution
If you are testing Flink streamlets locally [Using the Local Sandbox] you should be aware of several things:
-
Restartability. Unlike cluster deployment, a local Flink execution will not restart in the case of crashes.
-
Checkpointing. By default, in the case of local runs, checkpointing is done in memory, which means that in the case of local runs, checkpointing does not survive beyond an individual execution.
-
In the case of local execution, an instance of a Flink server is created for each SBT module for which
CloudflowFlinkPlugin
is enabled. A single Flink server (in the case of local execution) is running a single streamlet. As a result, if you want to be able to test locally, each SBT module should contain a single Flink streamlet. If you are developing several Flink streamlets, make sure that each one of them is in a separate module.
The checkpointing behaviour in this case (along with many other parameters) can be overwritten using [The Cloudflow Configuration Model]. The default configuration for Flink is here. Complete documentation of configuration parameters, can be found here.
In addition to "standard" Flink parameters, you can enable the Web UI
when using runLocal
. If the parameter local.web
is set in the configuration
(for the Flink runtime or a particular streamlet), the Flink execution is started with the Web UI, as shown in
here. By default the Web UI is available
at http://localhost:8081
, but the port can be overwritten using the configuration system with rest.port
under cloudflow.runtimes.flink.config.flink
or specifically for a streamlet under cloudflow.streamlets.<streamlet>.config
.
An example of modifying some flink configuration is shown below:
cloudflow.streamlets.processor.config {
local.web = on
rest.port = 5000
}
if you have more then one Flink streamlet in your local project, DO NOT enable web UI in cloudflow.runtimes.flink.config - this will set the same UI port for all Flink servers
and will prevent them from starting. Make sure that you are enabling Web UI for individual streamlets and using different ports for them.
|
In the above example the Web UI is enabled for the processors streamlet. which will listen on port 5000. You need to provide the location of the local configuration in your sbt build file:
lazy val taxiRidePipeline = appModule("taxi-ride-pipeline")
.enablePlugins(CloudflowApplicationPlugin)
.settings(commonSettings)
.settings(
name := "taxi-ride-fare",
runLocalConfigFile := Some("taxi-ride-pipeline/src/main/resources/local.conf"),
)
If you set execution.checkpointing.interval you need to set all other relevant checkpointing settings, the default checkpointing will not be used if execution.checkpointing.interval is set.
|
In the above example the runLocalConfigFile
setting is shown to point to a local.conf in the taxi-ride-fare example project.
Azure Blob Storage
Flink supports several file systems for reading and writing data and for its state backend. In this section, we will look at how to use Microsoft’s Azure Blob Storage for these purposes.
Please refer to the Flink documentation Azure Blob Storage
To make this work in a Cloudflow context there are a couple of things you need to do as follows.
Checkpoints
In the configuration file, you need to specify that the state.backend
is filesystem
, set state.checkpoints.dir
to match the location of your Blob, and provide your access key.
cloudflow.runtimes.flink.config {
flink.state.backend = filesystem
flink.state.checkpoints.dir = "wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"
flink.fs.azure.account.key.<your-azure-acount>.blob.core.windows.net = "<blob-access-key>"
}
Flink needs flink-azure-fs-hadoop
support to be built into your Streamlet in order to be able to use Azure as a filesystem backend. To faciliate this simply add the following library dependency to the build.sbt
.
libraryDependencies += "org.apache.flink" % "flink-azure-fs-hadoop" % cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion
Default Cloudflow Checkpointing
By default Cloudflow sets checkpointing with the values below.
execution.checkpointing.interval = 10000 millis
execution.checkpointing.min-pause = 500 millis
execution.checkpointing.timeout = 10 mins
execution.checkpointing.max-concurrent-checkpoints = 1
execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION
This parameters can be invalidated by passing at least execution.checkpointing.interval
setting when configuring streamlets. The interval must be changed to any other change take effect.
It is also possible to disable the settings shown above, and therefore fallback in Flink defaults, by configuring the Streamlet or Flink runtimes with config.cloudflow.checkpointing.default = off
.
You can learn how in Configuring Streamlets