Using Flink Streamlets

A Flink streamlet has the following responsibilities:

  • It needs to capture your stream processing logic.

  • It needs to publish metadata used by the sbt-cloudflow plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets need to match on inlets and outlets to make a valid cloudflow topology, as mentioned previously in Composing applications using blueprints. Cloudflow automatically extracts the shapes from the streamlets to verify this match.

  • For the Cloudflow runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.

  • The inlets and outlets of a Streamlet have two functions:

    • To specify to Cloudflow that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and

    • To provide handles inside the stream processing logic to connect to the data streams that Cloudflow provides. The StreamletLogic provides methods that take an inlet or outlet argument to read from or write to. These will be the specific data streams that Cloudflow has set up for you.

The next sections will go into the details of defining a Flink streamlet:

  • defining inlets and outlets

  • creating a streamlet shape from the inlets and outlets

  • creating a streamlet logic that uses the inlets and outlets to read and write data

Inlets and outlets

A streamlet can have one or more inlets and outlets. Cloudflow offers classes for Inlets and Outlets based on the codec of the data they manipulate. Currently, Cloudflow supports Avro, and therefore, the classes are named AvroInlet and AvroOutlet. Each outlet also allows the user to define a partitioning function that will be used to partition the data.

StreamletShape

The StreamletShape captures the connectivity and compatibility details of a Flink-based streamlet. It captures which—and how many—inlets and outlets the streamlet has.

The sbt-cloudflow plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath. This metadata is used to verify that the blueprint connects the streamlets correctly.

Cloudflow offers an API StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names. The outlet also allows for the definition of a partitioning function. This partitioning function is used to distribute the output data among partitions.

When you build your own Flink streamlet, you need to define the shape. You will learn how to do this in Building a Flink streamlet. The next section describes how the FlinkStreamletLogic captures stream processing logic.

FlinkStreamletLogic

The stream processing logic is captured in the abstract class FlinkStreamletLogic.

The FlinkStreamletLogic provides methods to read and write data streams, and provides access to the configuration of the streamlet, among other things.

A FlinkStreamletLogic must provide a method for executing streaming queries that process the Flink computation graph. The method buildExecutionGraph has to be overridden by implementation classes that process one or more DataStream s. The resulting graph is then submitted by executeStreamingQueries to the Flink runtime StreamExecutionEnvironment to generate the final output. These jobs will be run by the run method of FlinkStreamlet to produce a StreamletExecution class, the StreamletExecution class manages the execution of a Flink Job.

The FlinkStreamletLogic may contain instance values since it’s only constructed in runtime. The Streamlet, however, is also instantiated during compile-time, to extract metadata, and must not contain any instance values.

FlinkStreamletContext

The FlinkStreamletContext provides the necessary context under which a streamlet runs. It contains the following context data and contracts:

  • An active StreamExecutionEnvironment that will be used to submit streaming jobs to the Flink runtime.

  • The Typesafe Config loaded from the classpath through a config method, which can be used to read configuration settings.

  • The name used in the blueprint for the specific instance of this streamlet being run.

  • A mapping that gives the name of the Kafka topic from the port name.

Lifecycle

In general terms, when you publish and deploy a Cloudflow application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Flink streamlets, each streamlet is submitted as a Flink application through the Flink Operator as a Flink Application resource. Upon deployment, it becomes a set of running pods, with one pod as a Flink Job Manager and n-pods as Task Managers, where n is the scale factor for that streamlet.

The Flink Operator is a Kubernetes operator dedicated to managing Flink applications.

The Flink Operator takes care of monitoring the streamlets and is responsible for their resilient execution, like restarting the Flink application in case of failure.

In Flink Streamlet Deployment Model, we can visualize the chain of delegation involved in the deployment of a Flink streamlet:

  • The Cloudflow Operator prepares a Custom Resource describing the Flink streamlet and submits it to the Flink Operator

  • The Flink Operator processes the Custom Resource and issues the deployment of a Flink job manager pod.

  • The Flink Job Manager then requests task manager resources from Kubernetes to deploy the distributed processing.

  • Finally, if and when resources are available, the Flink-bound task managers start as Kubernetes pods. The task managers are the components tasked with the actual data processing, while the Job Manager serves as coordinator of the (stream) data process.

In case you are using the cloudflow-contrib model of integration, you need to go through some additional steps to complete the deployment of your Flink streamlets. This section on cloudflow-contrib has more details.

If you make any changes to the streamlet and deploy the application again, the existing Flink application will be stopped, and the new version will be started to reflect the changes. It could be that Flink streamlets are restarted in other ways, for instance, by administrators.

This means that a streamlet can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.

The message processing semantics provided by Flink streamlets are determined by the guarantees provided by the underlying sources, sinks and connectors. Flink offers at-least-once or exactly_once semantics depending on whether checkpointing is enabled.

If checkpointing is enabled, Flink guarantees end to end exactly-once processing with sources, sinks and connectors. Checkpointing is enabled by default in Cloudflow, using exactly_once checkpointing mode. You can override this using the Configuration system by specific execution.checkpointing settings, as shown here.

When we say “exactly-once semantics”, what we mean is that each incoming event affects the final results exactly once. Even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed. This post explains in detail how TwoPhaseCommitSinkFunction implements the two-phase commit protocol and makes it possible to build end-to-end exactly-once applications with Flink and a selection of data sources and sinks, including Apache Kafka versions 0.11 and beyond.

For more details of how the various types of processing semantics impact Kafka producers and consumers when interacting with Flink, please refer to this guide.

In Cloudflow, streamlets communicate with each other through Kafka - hence the semantics here depends on the settings that we use in our streamlets implementation. For Flink we use at-least-once in FlinkKafkaProducer. To use exactly-once the Kafka broker needs to have the value of transaction.max.timeout.ms set to at least 1 hour, which can have an impact on other runtimes in the application.

Reliable restart of stateful processes

A Pipeline application can be stateful and use the full capabilities of keyed state and operator state in Flink. Stateful operators can be used within a Flink streamlet. All states are stored in state stores deployed on Kubernetes using Persistent Volume Claims (PVCs) backed by a storage class that allows for access mode ReadWriteMany.

To deploy a Flink application it is necessary that a PVC already exists in the namespace for the application. Cloudflow will automatically mount this PVC in the streamlets themselves to allow state storage, as long as it has a /mnt/flink/storage mount path. Cloudflow will look for a PVC named cloudflow-flink by default that mounts this path. It is also possible to create a differently named PVC, that mounts /mnt/flink/storage and mount it using a configuration file, as described in The Configuration Model. The access mode for the PVC has to be ReadWriteMany.

Flink’s runtime encodes all state and writes them into checkpoints. And since checkpoints in Flink offer an exactly-once guarantee of semantics, all application state are preserved safely throughout the lifetime of the streamlet. In case of failures, recovery will be done from checkpoints, and there will be no data loss.

If you enable plugin CloudflowFlinkPlugin, Cloudflow will add all of the Flink jars, necessary for a typical Flink streamlet implementation. But sometimes you might need to add additional libraries. When doing so, it is necessary to make sure, that the versions of the libraries that you are adding have the same version that the rest of of the Flink libraries. The version used by Cloudflow are currently defined in the variable cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion. So, if for example, you want to add Azure file system, you can do it as shown below:

libraryDependencies ++=  Seq("org.apache.flink"%"flink-azure-fs-hadoop"%cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion)

If you are testing Flink streamlets locally [Using the Local Sandbox] you should be aware of several things:

  • Restartability. Unlike cluster deployment, a local Flink execution will not restart in the case of crashes.

  • Checkpointing. By default, in the case of local runs, checkpointing is done in memory, which means that in the case of local runs, checkpointing does not survive beyond an individual execution.

  • In the case of local execution, an instance of a Flink server is created for each SBT module for which CloudflowFlinkPlugin is enabled. A single Flink server (in the case of local execution) is running a single streamlet. As a result, if you want to be able to test locally, each SBT module should contain a single Flink streamlet. If you are developing several Flink streamlets, make sure that each one of them is in a separate module.

The checkpointing behaviour in this case (along with many other parameters) can be overwritten using [The Cloudflow Configuration Model]. The default configuration for Flink is here. Complete documentation of configuration parameters, can be found here.

In addition to "standard" Flink parameters, you can enable the Web UI when using runLocal. If the parameter local.web is set in the configuration (for the Flink runtime or a particular streamlet), the Flink execution is started with the Web UI, as shown in here. By default the Web UI is available at http://localhost:8081, but the port can be overwritten using the configuration system with rest.port under cloudflow.runtimes.flink.config.flink or specifically for a streamlet under cloudflow.streamlets.<streamlet>.config.

An example of modifying some flink configuration is shown below:

cloudflow.streamlets.processor.config {
  local.web = on
  rest.port = 5000
}
if you have more then one Flink streamlet in your local project, DO NOT enable web UI in cloudflow.runtimes.flink.config - this will set the same UI port for all Flink servers and will prevent them from starting. Make sure that you are enabling Web UI for individual streamlets and using different ports for them.

In the above example the Web UI is enabled for the processors streamlet. which will listen on port 5000. You need to provide the location of the local configuration in your sbt build file:

lazy val taxiRidePipeline = appModule("taxi-ride-pipeline")
  .enablePlugins(CloudflowApplicationPlugin)
  .settings(commonSettings)
  .settings(
    name := "taxi-ride-fare",
    runLocalConfigFile := Some("taxi-ride-pipeline/src/main/resources/local.conf"),
  )
If you set execution.checkpointing.interval you need to set all other relevant checkpointing settings, the default checkpointing will not be used if execution.checkpointing.interval is set.

In the above example the runLocalConfigFile setting is shown to point to a local.conf in the taxi-ride-fare example project.

Azure Blob Storage

Flink supports several file systems for reading and writing data and for its state backend. In this section, we will look at how to use Microsoft’s Azure Blob Storage for these purposes.

Please refer to the Flink documentation Azure Blob Storage

To make this work in a Cloudflow context there are a couple of things you need to do as follows.

Checkpoints

In the configuration file, you need to specify that the state.backend is filesystem, set state.checkpoints.dir to match the location of your Blob, and provide your access key.

cloudflow.runtimes.flink.config {
  flink.state.backend = filesystem
  flink.state.checkpoints.dir = "wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"
  flink.fs.azure.account.key.<your-azure-acount>.blob.core.windows.net = "<blob-access-key>"
}

Flink needs flink-azure-fs-hadoop support to be built into your Streamlet in order to be able to use Azure as a filesystem backend. To faciliate this simply add the following library dependency to the build.sbt.

libraryDependencies += "org.apache.flink" % "flink-azure-fs-hadoop" % cloudflow.sbt.CloudflowFlinkPlugin.FlinkVersion

Default Cloudflow Checkpointing

By default Cloudflow sets checkpointing with the values below.

execution.checkpointing.interval = 10000 millis
execution.checkpointing.min-pause = 500 millis
execution.checkpointing.timeout = 10 mins
execution.checkpointing.max-concurrent-checkpoints = 1
execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION

This parameters can be invalidated by passing at least execution.checkpointing.interval setting when configuring streamlets. The interval must be changed to any other change take effect.

It is also possible to disable the settings shown above, and therefore fallback in Flink defaults, by configuring the Streamlet or Flink runtimes with config.cloudflow.checkpointing.default = off. You can learn how in Configuring Streamlets