Developing a Hello World Cloudflow application
In this section we will develop a simple Hello World application that demonstrates the major features of Cloudflow. It will not have many of the advanced features that Cloudflow offers - the main idea is to give you a feel for what it takes to build a complete application and deploy it in running condition in a GKE cluster.
We will develop a simple pipeline that processes events from a wind turbine farm. The application will receive streaming data that will pass through the following transformations :
-
ingested by a streamlet (ingress)
-
processed by a streamlet to convert into a domain object (processor)
-
validated by a streamlet (splitter) that separates valid and invalid records
-
logged in to loggers to be checked at output (egress)
Each of the above terminologies (ingress, processor, splitter and egress) are explained in the [Basic Concepts](Basic\ Concepts.md). Here’s an overview of the application pipeline architecture:
![Application Pipeline](images/pipe.001.jpeg?raw=true "Application Pipeline")
One of the important features of Cloudflow architecture is the complete separation of the components from how they are connected as part of the pipeline. The streamlets described above are the individual building blocks of the pipeline. You can connect them using a declarative language that forms the blueprint of the pipeline. Streamlets can be shared across blueprints making them individual reusable objects. And just like streamlets, blueprints also form an independent component of a Cloudflow application.
Project structure
Here’s how we would structure a typical Cloudflow application as a project.
|-project
|-src
|---main
|-----avro
|-----blueprint
|-----resources
|-----scala
|-------sensordata
|-build.sbt
This is a Scala project and we have the following structural components at the leaf level of the above tree:
-
avro : contains the avro schema of the domain objects
-
blueprint : contains the blueprint of the application in a file named
blueprint.conf
-
scala : contains the source code of the application under the package name
sensordata
-
build.sbt : the sbt build script
The sbt build script
Here’s a minimal version of the sbt build script:
build.sbt:
import sbt._
import sbt.Keys._
lazy val sensorData = (project in file("."))
.enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
.settings(
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.1.2",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.10",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.akka" %% "akka-http-testkit" % "10.1.10" % "test"
),
name := "sensor-data",
organization := "com.lightbend",
scalaVersion := "2.12.10",
crossScalaVersions := Vector(scalaVersion.value)
)
Cloudflow offers several sbt plugins that abstract quite a bit of boilerplates necessary to build a complete application. In this example we use the plugin CloudflowAkkaStreamsApplicationPlugin
that provides you all building blocks of developing an Akka Streams based Cloudflow application.
Note: You can use multiple plugins to develop an application that uses multiple runtimes (Akka, Spark, Flink etc.). For simplicity of this example we will be using only one.
The above build script is standard Scala sbt - the only difference is the plugin which we provide as part of Cloudflow.
Schema first approach
In Cloudflow streamlets work with optional inputs and outputs that are statically typed. The types represent objects that the specific input / output can handle. The first step in application development is to encode the objects in the form of [avro schema](https://avro.apache.org/docs/current/). Cloudflow will generate appropriate classes corresponding to each schema.
Let’s start building the avro schema for the domain objects that we need for the application. These schema files will have an extension .avsc
and will go directly under src/main/avro
in the project structure that we discussed earlier.
SensorData : The data that we receive from the source and ingested through our ingress (SensorData.avsc
).
{
"namespace": "sensordata",
"type": "record",
"name": "SensorData",
"fields":[
{
"name": "deviceId",
"type": {
"type": "string",
"logicalType": "uuid"
}
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "measurements", "type": "sensordata.Measurements"
}
]
}
Measurements : A substructure of SensorData
(Measurements.avsc
)
{
"namespace": "sensordata",
"type": "record",
"name": "Measurements",
"fields":[
{
"name": "power", "type": "double"
},
{
"name": "rotorSpeed", "type": "double"
},
{
"name": "windSpeed", "type": "double"
}
]
}
Metric : A domain object that we would like to track and measure (Metric.avsc
)
{
"namespace": "sensordata",
"type": "record",
"name": "Metric",
"fields":[
{
"name": "deviceId",
"type": {
"type": "string",
"logicalType": "uuid"
}
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "name", "type": "string"
},
{
"name": "value", "type": "double"
}
]
}
InvalidMetric : An object that abstracts the erroneous metric (InvalidMetric.avsc
)
{
"namespace": "sensordata",
"type": "record",
"name": "InvalidMetric",
"fields":[
{
"name": "metric", "type": "sensordata.Metric"
},
{
"name": "error", "type": "string"
}
]
}
Note: The above schema files are processed during the build process through the infrastructure of the Cloudflow plugin system. For each of these schema files, Cloudflow will generate Scala case classes that can be directly used form within the application.
Let’s build some streamlets
All streamlets will reside under src/main/scala/sensordata
where sensordata
is the package name. Let’s start with the ingress, which we implement in a class named SensorDataHttpIngress
.
SensorDataHttpIngress.scala
package sensordata
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import SensorDataJsonSupport._
class SensorDataHttpIngress extends AkkaServerStreamlet {
val out = AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
def shape = StreamletShape.withOutlets(out)
override def createLogic = HttpServerLogic.default(this, out)
}
The above ingress has an outlet through which ingested data is passed downstream through the pipeline. In any streamlet class the StreamletLogic
abstracts the behavior and we use the default behavior that HttpServerLogic
offers out of the box.
Ingested data is then passed to another streamlet metrics
which converts objects of type SensorData
to a domain object Metric
.
SensorDataToMetrics.scala
package sensordata
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
import cloudflow.streamlets.avro._
class SensorDataToMetrics extends AkkaStreamlet {
val in = AvroInlet[SensorData]("in")
val out = AvroOutlet[Metric]("out").withPartitioner(RoundRobinPartitioner)
val shape = StreamletShape(in, out)
def flow = {
FlowWithOffsetContext[SensorData]
.mapConcat { data ⇒
List(
Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
)
}
}
override def createLogic = new RunnableGraphStreamletLogic() {
def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))
}
}
The above streamlet has an inlet and an outlet and processes data that it receives using the business logic. In this example we convert SensorData
to Metric
- note that the inlets and outlets are typed accordingly.
Now that we have the metric that we would like to measure and track, we need to validate them as per business rules. And we have a separate streamlet (validation
) for doing exactly this.
MetricsValidation.scala
package sensordata
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class MetricsValidation extends AkkaStreamlet {
val in = AvroInlet[Metric]("in")
val invalid = AvroOutlet[InvalidMetric]("invalid").withPartitioner(metric ⇒ metric.metric.deviceId.toString)
val valid = AvroOutlet[Metric]("valid").withPartitioner(RoundRobinPartitioner)
val shape = StreamletShape(in).withOutlets(invalid, valid)
override def createLogic = new SplitterLogic(in, invalid, valid) {
def flow = flowWithOffsetContext()
.map { metric ⇒
if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
else Right(metric)
}
}
}
The above streamlet has an inlet and 2 outlets for generating valid and invalid metrics. And all of them are typed based on the data that they are expected to handle. In the behavior handled by createLogic
method, SensorDataUtils.isValidMetric(..)
handles the business validation. We implement that logic in the next class.
SensorDataUtils.scala
package sensordata
object SensorDataUtils {
def isValidMetric(m: Metric) = m.value >= 0.0
}
In real life we will have more complex business logic - this is just for demonstration only.
Finally we are down to the point where we can log the valid and invalid metrics separately in 2 streamlets - valid-logger
nd invalid-logger
.
ValidMetricLogger.scala
package sensordata
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class ValidMetricLogger extends AkkaStreamlet {
val inlet = AvroInlet[Metric]("in")
val shape = StreamletShape.withInlets(inlet)
override def createLogic = new RunnableGraphStreamletLogic() {
def log(metric: Metric) = {
system.log.info(metric.toString)
}
def flow = {
FlowWithOffsetContext[Metric]
.map { validMetric ⇒
log(validMetric)
validMetric
}
}
def runnableGraph = {
sourceWithOffsetContext(inlet).via(flow).to(sinkWithOffsetContext)
}
}
}
InvalidMetricLogger.scala
package sensordata
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class InvalidMetricLogger extends AkkaStreamlet {
val inlet = AvroInlet[InvalidMetric]("in")
val shape = StreamletShape.withInlets(inlet)
override def createLogic = new RunnableGraphStreamletLogic() {
val flow = FlowWithOffsetContext[InvalidMetric]
.map { invalidMetric ⇒
system.log.warning(s"Invalid metric detected! $invalidMetric")
invalidMetric
}
def runnableGraph = {
sourceWithOffsetContext(inlet).via(flow).to(sinkWithOffsetContext)
}
}
}
Finally we have some support classes that we need to process JSON records through Cloudflow pipeline.
JsonFormats.scala
package sensordata
import java.time.Instant
import java.util.UUID
import scala.util.Try
import spray.json._
trait UUIDJsonSupport extends DefaultJsonProtocol {
implicit object UUIDFormat extends JsonFormat[UUID] {
def write(uuid: UUID) = JsString(uuid.toString)
def read(json: JsValue): UUID = json match {
case JsString(uuid) ⇒ Try(UUID.fromString(uuid)).getOrElse(deserializationError(s"Expected valid UUID but got '$uuid'."))
case other ⇒ deserializationError(s"Expected UUID as JsString, but got: $other")
}
}
}
trait InstantJsonSupport extends DefaultJsonProtocol {
implicit object InstantFormat extends JsonFormat[Instant] {
def write(instant: Instant) = JsNumber(instant.toEpochMilli)
def read(json: JsValue): Instant = json match {
case JsNumber(value) ⇒ Instant.ofEpochMilli(value.toLong)
case other ⇒ deserializationError(s"Expected Instant as JsNumber, but got: $other")
}
}
}
object MeasurementsJsonSupport extends DefaultJsonProtocol {
implicit val measurementFormat = jsonFormat3(Measurements.apply)
}
object SensorDataJsonSupport extends DefaultJsonProtocol with UUIDJsonSupport with InstantJsonSupport {
import MeasurementsJsonSupport._
implicit val sensorDataFormat = jsonFormat3(SensorData.apply)
}
package.scala
import java.time.Instant
package object sensordata {
implicit def toInstant(millis: Long): Instant = Instant.ofEpochMilli(millis)
}
Let’s join Streamlets to form the Pipeline
As we mentioned above, we need to have a blueprint of the pipeline that will declare which streamlets join together to form our pipeline. Here’s blueprint.conf
in src/main/blueprint
that does specify the connection:
blueprint {
streamlets {
http-ingress = sensordata.SensorDataHttpIngress
metrics = sensordata.SensorDataToMetrics
validation = sensordata.MetricsValidation
valid-logger = sensordata.ValidMetricLogger
invalid-logger = sensordata.InvalidMetricLogger
}
connections {
http-ingress.out = [metrics.in]
metrics.out = [validation.in]
validation.invalid = [invalid-logger.in]
validation.valid = [valid-logger.in]
}
}