Developing a Hello World Cloudflow application

In this section we will develop a simple Hello World application that demonstrates the major features of Cloudflow. It will not have many of the advanced features that Cloudflow offers - the main idea is to give you a feel for what it takes to build a complete application and deploy it in running condition in a GKE cluster.

We will develop a simple pipeline that processes events from a wind turbine farm. The application will receive streaming data that will pass through the following transformations :

  • ingested by a streamlet (ingress)

  • processed by a streamlet to convert into a domain object (processor)

  • validated by a streamlet (splitter) that separates valid and invalid records

  • logged in to loggers to be checked at output (egress)

Each of the above terminologies (ingress, processor, splitter and egress) are explained in the [Basic Concepts](Basic\ Concepts.md). Here’s an overview of the application pipeline architecture:

![Application Pipeline](images/pipe.001.jpeg?raw=true "Application Pipeline")

One of the important features of Cloudflow architecture is the complete separation of the components from how they are connected as part of the pipeline. The streamlets described above are the individual building blocks of the pipeline. You can connect them using a declarative language that forms the blueprint of the pipeline. Streamlets can be shared across blueprints making them individual reusable objects. And just like streamlets, blueprints also form an independent component of a Cloudflow application.

Project structure

Here’s how we would structure a typical Cloudflow application as a project.

   |-project
   |-src
   |---main
   |-----avro
   |-----blueprint
   |-----resources
   |-----scala
   |-------sensordata
   |-build.sbt

This is a Scala project and we have the following structural components at the leaf level of the above tree:

  • avro : contains the avro schema of the domain objects

  • blueprint : contains the blueprint of the application in a file named blueprint.conf

  • scala : contains the source code of the application under the package name sensordata

  • build.sbt : the sbt build script

The sbt build script

Here’s a minimal version of the sbt build script:

build.sbt:

import sbt._
import sbt.Keys._

lazy val sensorData =  (project in file("."))
  .enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
  .settings(
    libraryDependencies ++= Seq(
      "com.lightbend.akka"     %% "akka-stream-alpakka-file"  % "1.1.2",
      "com.typesafe.akka"      %% "akka-http-spray-json"      % "10.1.10",
      "ch.qos.logback"         %  "logback-classic"           % "1.2.3",
      "com.typesafe.akka"      %% "akka-http-testkit"         % "10.1.10" % "test"
    ),
    name := "sensor-data",
    organization := "com.lightbend",

    scalaVersion := "2.12.10",
    crossScalaVersions := Vector(scalaVersion.value)
  )

Cloudflow offers several sbt plugins that abstract quite a bit of boilerplates necessary to build a complete application. In this example we use the plugin CloudflowAkkaStreamsApplicationPlugin that provides you all building blocks of developing an Akka Streams based Cloudflow application.

Note: You can use multiple plugins to develop an application that uses multiple runtimes (Akka, Spark, Flink etc.). For simplicity of this example we will be using only one.

The above build script is standard Scala sbt - the only difference is the plugin which we provide as part of Cloudflow.

Schema first approach

In Cloudflow streamlets work with optional inputs and outputs that are statically typed. The types represent objects that the specific input / output can handle. The first step in application development is to encode the objects in the form of [avro schema](https://avro.apache.org/docs/current/). Cloudflow will generate appropriate classes corresponding to each schema.

Let’s start building the avro schema for the domain objects that we need for the application. These schema files will have an extension .avsc and will go directly under src/main/avro in the project structure that we discussed earlier.

SensorData : The data that we receive from the source and ingested through our ingress (SensorData.avsc).

{
    "namespace": "sensordata",
    "type": "record",
    "name": "SensorData",
    "fields":[
         {
            "name": "deviceId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
         },
         {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
         },
         {
            "name": "measurements", "type": "sensordata.Measurements"
         }
    ]
}

Measurements : A substructure of SensorData (Measurements.avsc)

{
    "namespace": "sensordata",
    "type": "record",
    "name": "Measurements",
    "fields":[
         {
            "name": "power", "type": "double"
         },
         {
            "name": "rotorSpeed", "type": "double"
         },
         {
            "name": "windSpeed", "type": "double"
         }
    ]
}

Metric : A domain object that we would like to track and measure (Metric.avsc)

{
    "namespace": "sensordata",
    "type": "record",
    "name": "Metric",
    "fields":[
         {
            "name": "deviceId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
         },
         {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
         },
         {
            "name": "name", "type": "string"
         },
         {
            "name": "value", "type": "double"
         }
    ]
}

InvalidMetric : An object that abstracts the erroneous metric (InvalidMetric.avsc)

{
    "namespace": "sensordata",
    "type": "record",
    "name": "InvalidMetric",
    "fields":[
         {
            "name": "metric", "type": "sensordata.Metric"
         },
         {
            "name": "error", "type": "string"
         }
    ]
}

Note: The above schema files are processed during the build process through the infrastructure of the Cloudflow plugin system. For each of these schema files, Cloudflow will generate Scala case classes that can be directly used form within the application.

Let’s build some streamlets

All streamlets will reside under src/main/scala/sensordata where sensordata is the package name. Let’s start with the ingress, which we implement in a class named SensorDataHttpIngress.

SensorDataHttpIngress.scala

package sensordata

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import SensorDataJsonSupport._

class SensorDataHttpIngress extends AkkaServerStreamlet {
  val out = AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)
  override def createLogic = HttpServerLogic.default(this, out)
}

The above ingress has an outlet through which ingested data is passed downstream through the pipeline. In any streamlet class the StreamletLogic abstracts the behavior and we use the default behavior that HttpServerLogic offers out of the box.

Ingested data is then passed to another streamlet metrics which converts objects of type SensorData to a domain object Metric.

SensorDataToMetrics.scala

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
import cloudflow.streamlets.avro._

class SensorDataToMetrics extends AkkaStreamlet {
  val in = AvroInlet[SensorData]("in")
  val out = AvroOutlet[Metric]("out").withPartitioner(RoundRobinPartitioner)
  val shape = StreamletShape(in, out)
  def flow = {
    FlowWithOffsetContext[SensorData]
      .mapConcat { data ⇒
        List(
          Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
          Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
          Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
        )
      }
  }
  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))
  }
}

The above streamlet has an inlet and an outlet and processes data that it receives using the business logic. In this example we convert SensorData to Metric - note that the inlets and outlets are typed accordingly.

Now that we have the metric that we would like to measure and track, we need to validate them as per business rules. And we have a separate streamlet (validation) for doing exactly this.

MetricsValidation.scala

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class MetricsValidation extends AkkaStreamlet {
  val in = AvroInlet[Metric]("in")
  val invalid = AvroOutlet[InvalidMetric]("invalid").withPartitioner(metric ⇒ metric.metric.deviceId.toString)
  val valid = AvroOutlet[Metric]("valid").withPartitioner(RoundRobinPartitioner)
  val shape = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new SplitterLogic(in, invalid, valid) {
    def flow = flowWithOffsetContext()
      .map { metric ⇒
        if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
        else Right(metric)
      }
  }
}

The above streamlet has an inlet and 2 outlets for generating valid and invalid metrics. And all of them are typed based on the data that they are expected to handle. In the behavior handled by createLogic method, SensorDataUtils.isValidMetric(..) handles the business validation. We implement that logic in the next class.

SensorDataUtils.scala

package sensordata

object SensorDataUtils {
  def isValidMetric(m: Metric) = m.value >= 0.0
}

In real life we will have more complex business logic - this is just for demonstration only.

Finally we are down to the point where we can log the valid and invalid metrics separately in 2 streamlets - valid-logger nd invalid-logger.

ValidMetricLogger.scala

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class ValidMetricLogger extends AkkaStreamlet {
  val inlet = AvroInlet[Metric]("in")
  val shape = StreamletShape.withInlets(inlet)

  override def createLogic = new RunnableGraphStreamletLogic() {

    def log(metric: Metric) = {
      system.log.info(metric.toString)
    }

    def flow = {
      FlowWithOffsetContext[Metric]
        .map { validMetric ⇒
          log(validMetric)
          validMetric
        }
    }

    def runnableGraph = {
      sourceWithOffsetContext(inlet).via(flow).to(sinkWithOffsetContext)
    }
  }
}

InvalidMetricLogger.scala

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class InvalidMetricLogger extends AkkaStreamlet {
  val inlet = AvroInlet[InvalidMetric]("in")
  val shape = StreamletShape.withInlets(inlet)

  override def createLogic = new RunnableGraphStreamletLogic() {
    val flow = FlowWithOffsetContext[InvalidMetric]
      .map { invalidMetric ⇒
        system.log.warning(s"Invalid metric detected! $invalidMetric")
        invalidMetric
      }

    def runnableGraph = {
      sourceWithOffsetContext(inlet).via(flow).to(sinkWithOffsetContext)
    }
  }
}

Finally we have some support classes that we need to process JSON records through Cloudflow pipeline.

JsonFormats.scala

package sensordata

import java.time.Instant
import java.util.UUID

import scala.util.Try

import spray.json._

trait UUIDJsonSupport extends DefaultJsonProtocol {
  implicit object UUIDFormat extends JsonFormat[UUID] {
    def write(uuid: UUID) = JsString(uuid.toString)

    def read(json: JsValue): UUID = json match {
      case JsString(uuid) ⇒ Try(UUID.fromString(uuid)).getOrElse(deserializationError(s"Expected valid UUID but got '$uuid'."))
      case other          ⇒ deserializationError(s"Expected UUID as JsString, but got: $other")
    }
  }
}

trait InstantJsonSupport extends DefaultJsonProtocol {
  implicit object InstantFormat extends JsonFormat[Instant] {
    def write(instant: Instant) = JsNumber(instant.toEpochMilli)

    def read(json: JsValue): Instant = json match {
      case JsNumber(value) ⇒ Instant.ofEpochMilli(value.toLong)
      case other           ⇒ deserializationError(s"Expected Instant as JsNumber, but got: $other")
    }
  }
}

object MeasurementsJsonSupport extends DefaultJsonProtocol {
  implicit val measurementFormat = jsonFormat3(Measurements.apply)
}

object SensorDataJsonSupport extends DefaultJsonProtocol with UUIDJsonSupport with InstantJsonSupport {
  import MeasurementsJsonSupport._
  implicit val sensorDataFormat = jsonFormat3(SensorData.apply)
}

package.scala

import java.time.Instant

package object sensordata {
  implicit def toInstant(millis: Long): Instant = Instant.ofEpochMilli(millis)
}

Let’s join Streamlets to form the Pipeline

As we mentioned above, we need to have a blueprint of the pipeline that will declare which streamlets join together to form our pipeline. Here’s blueprint.conf in src/main/blueprint that does specify the connection:

blueprint {
  streamlets {
    http-ingress = sensordata.SensorDataHttpIngress
    metrics = sensordata.SensorDataToMetrics
    validation = sensordata.MetricsValidation
    valid-logger = sensordata.ValidMetricLogger
    invalid-logger = sensordata.InvalidMetricLogger
  }

  connections {
    http-ingress.out = [metrics.in]
    metrics.out = [validation.in]
    validation.invalid = [invalid-logger.in]
    validation.valid = [valid-logger.in]
  }
}