Schema-first approach
Cloudflow uses a schema-first approach for building streaming data pipelines. You simply need to supply an Avro schema as the starting point of the domain model for which a streaming data flow needs to be built. Cloudflow adds the appropriate plug-ins to the build system of the application to generate Java / Scala classes corresponding to the Avro schema.
This approach has the advantage that you only need to take care of the core domain model and Cloudflow does the heavy lifting of generating the classes and integrating them with the main application.
However since Cloudflow takes the schema as the input, it needs to ensure that the corresponding data inlets and outlets honor the schema when allowing data to flow through them. This needs to be done to ensure data consistency across all the inlets and outlets. We discuss this in the next section.
Schema code generation
You can choose what programming language to generate their schemas into by defining settings in the SBT project. When no settings are defined, by default the sbt-cloudflow
plugin will look for Avro schemas in src/main/avro
and will generate Scala classes.
If you wish to override the location of your Avro schemas in your project, or if you wish to generate Java classes instead, you can do so by defining a Setting
in your SBT project.
-
schemaCodeGenerator
(Default:SchemaCodeGenerator.Scala
) - The programming language to generate Avro schemas classes into. -
schemaPaths
(Default:Map(SchemaFormat.Avro → "src/main/avro")
) - The relative path to the Avro schemas. -
schemaFormat
(Default:Seq(SchemaFormat.Avro)
) - The schema formats to generate. Avro is the default format. Since Cloudflow 2.0, there’s experimental support for Protobuf (SchemaFormat.Proto
).
For example, to generate Java classes from Avro schemas in a non-default location, you can use a configuration similar to the following:
lazy val datamodel = (project in file("./my-cloudflow-library"))
.enablePlugins(CloudflowLibraryPlugin)
.settings(
schemaCodeGenerator := SchemaCodeGenerator.Java,
schemaPaths := Map(
SchemaFormat.Avro -> "src/main/resources/avroschemas",
SchemaFormat.Proto -> "src/main/resources/protobuf"
)
)
Schema-aware inlets and outlets
In Cloudflow, all streamlet inlets and outlets are schema-aware. This means two things:
-
Every inlet and outlet must allow only data to flow through them that honors their schema.
-
Inlets and outlets can be connected together only if the schemas of an outlet and the corresponding inlet are compatible.
Let’s take a look at how Cloudflow ensures both of the above guarantees.
Data integrity guarantee through schemas and types
As mentioned above, any application definition starts with the schema definition for the domain object. Let’s assume you provide the following Avro schema as an input. It’s a definition of a call record as used by a telephone company.
{
"namespace": "cloudflow.callrecordaggregator",
"type": "record",
"name": "CallRecord",
"fields":[
{
"name": "user",
"type": "string"
},
{
"name": "other",
"type": "string"
},
{
"name": "direction",
"type": "string"
},
{
"name": "duration",
"type": "long"
},
{
"name": "timestamp",
"type": "long"
}
]
}
In case of a Scala application, Cloudflow will generate a Scala case class
cloudflow.examples.CallRecord
corresponding to the above schema.
This class will now be made available for use within the application.
When we define a streamlet where objects of type CallRecord
will be passing through its inlet, we define the inlet as follows:
val in = AvroInlet[CallRecord]("call-record-in")
Cloudflow ensures the following compile time guarantees through this type definition:
-
The inlet only allows a codec of type Avro.
-
Cloudflow only allows the inlet to be used with an object of type
CallRecord
.For example, when implementing
createLogic
if you doreadStream(in)
wherein
is an inlet parameterized by a type other thanCallRecord
, the compiler will complain.
Dealing with "bad" messages
Despite data integrity guarantee through schemas provided by Cloudflow, there is still cases of "bad" messages coming into inlet. This can be a result of different schema versions or the case when inlet is listening on the topic produced by non Cloudflow applications.
There are two distict flavors of "bad" data:
-
Syntactically bad data - the data that does not adhere to inlet’s schema and as a result can’t be unmarshalled properly
-
Semantically bad data is the data that is syntactically correct, but its content does not adhere to business requirements for streamlet processing.
Dealing with Semantically bad data has to be implemented directly by Streamlet’s logic, while support for dealing with semantically bad data is provided by Cloudflow.
There are several possible strategies for dealing with semantically bad data:
-
You can skip bad data records.
If the bad record is skipped, its offset is not committed. This is ok, if the occurance of of the "bad" records is rare. The next record’s offset will be committed. If all the records are "bad", then the offset will be never committed. |
-
You can replace bad message with the predefined ones.
In addition there several possible approaches to reporting "bad" records, for example:
-
Logging
-
Writing to the dead-leter queue/topic
-
etc.
By default, cloudflow will log "bad" records and skip them. This behavior can be overwritten (on an Inlet level) by adding a custom error handler to inlet:
val in = AvroInlet[CallRecord]("in").withErrorHandler(CustomHandler)
Here CustomHandler
is any function adhering to the following interface
errorHandler: (Array[Byte], Throwable) => Option[T]
External Inlets and outlets
As mentioned earlier, Cloudflow uses a schema-first approach for building streaming data pipelines.
This works great for intra streamlet communications, but often falls short for integration
with existing applications which might use proprietary encoding for messages.
To accommodate these use cases Cloudflow introduces ExternalInlet
and ExternalOutlet
,
which implement Kafka communications in ByteArray
format, allowing user to explicitly
marshal/unmarshal data as part of the application code.
When using ExternalInlet
each data record is read by streamlet as an array of bytes.
When using ExternalOutlet
each data record should be converted to an array of bytes before
writing it into an outlet.
ExternalInlet and ExternalOutlet are not intended for intra-streamlet communications. They should only
be used for Kafka-based integration with existing applications supporting proprietary message formats.
|
Outlets and the partitioning function
Similar to inlets, the user can define an outlet as:
val out = AvroOutlet[CallRecord]("out").withPartitioner(RoundRobinPartitioner)
Besides the name of the outlet, it may also have a partitioning function that defines how data will be partitioned when writing to Kafka topics.
Data partitioning in Kafka ensures scalability.
If no partitioning function is specified, it will default to the RoundRobinPartitioner
.
All logic regarding data safety that we discussed for inlets in the last section applies for outlets as well.
Schema-aware StreamletLogic
When we implement StreamletLogic
for a streamlet, we use the inlets and outlets which, as we discussed above, are schema aware. Note that in the following code fragment, all inlet and outlet types are parameterized with domain classes CallRecord
and AggregatedCallStats
that have been generated using the schema.
Here’s an example:
val in = AvroInlet[CallRecord]("in")
val out = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
val shape = StreamletShape(in, out)
In the above example, we have one inlet that allows data of type CallRecord
and one outlet that allows data of type AggregatedCallStats
. Here the user had supplied the schema for both of the above types from which Scala classes have been generated by Cloudflow. And the entire StreamletLogic
code is based on these two classes - we read CallRecord
from the inlet, do processing and generate AggregatedCallStats
to be sent to the outlet.
Hence the entire streamlet is guaranteed only to process data that conforms to the schema which the user had supplied.