Schema-first approach

Cloudflow uses a schema-first approach for building streaming data pipelines. You simply need to supply an Avro schema as the starting point of the domain model for which a streaming data flow needs to be built. Cloudflow adds the appropriate plug-ins to the build system of the application to generate Java / Scala classes corresponding to the Avro schema.

This approach has the advantage that you only need to take care of the core domain model and Cloudflow does the heavy lifting of generating the classes and integrating them with the main application.

However since Cloudflow takes the schema as the input, it needs to ensure that the corresponding data inlets and outlets honor the schema when allowing data to flow through them. This needs to be done to ensure data consistency across all the inlets and outlets. We discuss this in the next section.

Schema code generation

You can choose what programming language to generate their schemas into by defining settings in the SBT project. When no settings are defined, by default the sbt-cloudflow plugin will look for Avro schemas in src/main/avro and will generate Scala classes.

If you wish to override the location of your Avro schemas in your project, or if you wish to generate Java classes instead, you can do so by defining a Setting in your SBT project.

  • schemaCodeGenerator (Default: SchemaCodeGenerator.Scala) - The programming language to generate Avro schemas classes into.

  • schemaPaths (Default: Map(SchemaFormat.Avro → "src/main/avro")) - The relative path to the Avro schemas.

  • schemaFormat (Default: Seq(SchemaFormat.Avro)) - The schema formats to generate. Avro is the default format. Since Cloudflow 2.0, there’s experimental support for Protobuf (SchemaFormat.Proto).

For example, to generate Java classes from Avro schemas in a non-default location, you can use a configuration similar to the following:

lazy val datamodel = (project in file("./my-cloudflow-library"))
    schemaCodeGenerator := SchemaCodeGenerator.Java,
    schemaPaths := Map(
      SchemaFormat.Avro -> "src/main/resources/avroschemas",
      SchemaFormat.Proto -> "src/main/resources/protobuf"

Schema-aware inlets and outlets

In Cloudflow, all streamlet inlets and outlets are schema-aware. This means two things:

  • Every inlet and outlet must allow only data to flow through them that honors their schema.

  • Inlets and outlets can be connected together only if the schemas of an outlet and the corresponding inlet are compatible.

Let’s take a look at how Cloudflow ensures both of the above guarantees.

Data integrity guarantee through schemas and types

As mentioned above, any application definition starts with the schema definition for the domain object. Let’s assume you provide the following Avro schema as an input. It’s a definition of a call record as used by a telephone company.

    "namespace": "cloudflow.callrecordaggregator",
    "type": "record",
    "name": "CallRecord",
            "name": "user",
            "type": "string"
             "name": "other",
             "type": "string"
             "name": "direction",
             "type": "string"
              "name": "duration",
              "type": "long"
            "name": "timestamp",
            "type": "long"

In case of a Scala application, Cloudflow will generate a Scala case class cloudflow.examples.CallRecord corresponding to the above schema. This class will now be made available for use within the application.

When we define a streamlet where objects of type CallRecord will be passing through its inlet, we define the inlet as follows:

val in = AvroInlet[CallRecord]("call-record-in")

Cloudflow ensures the following compile time guarantees through this type definition:

  • The inlet only allows a codec of type Avro.

  • Cloudflow only allows the inlet to be used with an object of type CallRecord.

    For example, when implementing createLogic if you do readStream(in) where in is an inlet parameterized by a type other than CallRecord, the compiler will complain.

Dealing with "bad" messages

Despite data integrity guarantee through schemas provided by Cloudflow, there is still cases of "bad" messages coming into inlet. This can be a result of different schema versions or the case when inlet is listening on the topic produced by non Cloudflow applications.

There are two distict flavors of "bad" data:

  • Syntactically bad data - the data that does not adhere to inlet’s schema and as a result can’t be unmarshalled properly

  • Semantically bad data is the data that is syntactically correct, but its content does not adhere to business requirements for streamlet processing.

Dealing with Semantically bad data has to be implemented directly by Streamlet’s logic, while support for dealing with semantically bad data is provided by Cloudflow.

There are several possible strategies for dealing with semantically bad data:

  • You can skip bad data records.

If the bad record is skipped, its offset is not committed. This is ok, if the occurance of of the "bad" records is rare. The next record’s offset will be committed. If all the records are "bad", then the offset will be never committed.
  • You can replace bad message with the predefined ones.

In addition there several possible approaches to reporting "bad" records, for example:

  • Logging

  • Writing to the dead-leter queue/topic

  • etc.

By default, cloudflow will log "bad" records and skip them. This behavior can be overwritten (on an Inlet level) by adding a custom error handler to inlet:

val in = AvroInlet[CallRecord]("in").withErrorHandler(CustomHandler)

Here CustomHandler is any function adhering to the following interface

errorHandler: (Array[Byte], Throwable) => Option[T]

External Inlets and outlets

As mentioned earlier, Cloudflow uses a schema-first approach for building streaming data pipelines. This works great for intra streamlet communications, but often falls short for integration with existing applications which might use proprietary encoding for messages. To accommodate these use cases Cloudflow introduces ExternalInlet and ExternalOutlet, which implement Kafka communications in ByteArray format, allowing user to explicitly marshal/unmarshal data as part of the application code.

When using ExternalInlet each data record is read by streamlet as an array of bytes. When using ExternalOutlet each data record should be converted to an array of bytes before writing it into an outlet.

ExternalInlet and ExternalOutlet are not intended for intra-streamlet communications. They should only be used for Kafka-based integration with existing applications supporting proprietary message formats.

Outlets and the partitioning function

Similar to inlets, the user can define an outlet as:

  val out = AvroOutlet[CallRecord]("out").withPartitioner(RoundRobinPartitioner)

Besides the name of the outlet, it may also have a partitioning function that defines how data will be partitioned when writing to Kafka topics. Data partitioning in Kafka ensures scalability. If no partitioning function is specified, it will default to the RoundRobinPartitioner.

All logic regarding data safety that we discussed for inlets in the last section applies for outlets as well.

Schema-aware StreamletLogic

When we implement StreamletLogic for a streamlet, we use the inlets and outlets which, as we discussed above, are schema aware. Note that in the following code fragment, all inlet and outlet types are parameterized with domain classes CallRecord and AggregatedCallStats that have been generated using the schema. Here’s an example:

  val in    = AvroInlet[CallRecord]("in")
  val out   = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

In the above example, we have one inlet that allows data of type CallRecord and one outlet that allows data of type AggregatedCallStats. Here the user had supplied the schema for both of the above types from which Scala classes have been generated by Cloudflow. And the entire StreamletLogic code is based on these two classes - we read CallRecord from the inlet, do processing and generate AggregatedCallStats to be sent to the outlet.

Hence the entire streamlet is guaranteed only to process data that conforms to the schema which the user had supplied.