Develop example streamlets

All streamlets belong under the src/main/scala/sensordata directory, where sensordata is the package name. To develop the streamlets, copy the provided code to the appropriate source file name. The complete code is available in the sensor-data-scala example.

Ingress streamlet

Let’s start with the ingress, which we implement in a class named SensorDataHttpIngress. The ingress has an outlet through which ingested data is passed downstream through the pipeline. In any streamlet class, the StreamletLogic abstracts the behavior. In this specific example, we use the default behavior that HttpServerLogic offers out of the box.

In SensorDataHttpIngress.scala, include the following code:

package sensordata

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import SensorDataJsonSupport._

class SensorDataHttpIngress extends AkkaServerStreamlet {
  val out: CodecOutlet[SensorData]               = AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
  override def shape(): StreamletShape           = StreamletShape.withOutlets(out)
  override def createLogic(): AkkaStreamletLogic = HttpServerLogic.default(this, out)
}

Convert sensor to metric streamlet

We will have the ingress pass data to another streamlet, SensorDataToMetrics. This streamlet has an inlet and an outlet and processes data that it receives using the business logic. In this example we convert objects of type SensorData to domain Metric objects - note that the inlets and outlets are typed accordingly.

In SensorDataToMetrics.scala, include the following code:

package sensordata

import akka.stream.scaladsl._
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class SensorDataToMetrics extends AkkaStreamlet {
  val in: CodecInlet[SensorData]     = AvroInlet[SensorData]("in")
  val out: CodecOutlet[Metric]       = AvroOutlet[Metric]("out").withPartitioner(RoundRobinPartitioner)
  override val shape: StreamletShape = StreamletShape(in, out)
  def flow =
    FlowWithCommittableContext[SensorData]()
      .mapConcat { data =>
        List(
          Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
          Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
          Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
        )
      }
  override def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
    override def runnableGraph(): RunnableGraph[_] = sourceWithCommittableContext(in).via(flow).to(committableSink(out))
  }
}

Validation streamlets

Now that we have the metrics that we would like to measure and track, we need to validate them using business rules. We have a separate MetricValidation streamlet for doing exactly this.

This streamlet has an inlet and 2 outlets for generating valid and invalid metrics. And all of them are typed based on the data that they are expected to handle. In the behavior handled by createLogic method, SensorDataUtils.isValidMetric(..) handles the business validation. We implement that logic in the next class.

In MetricsValidation.scala, include the following code:

package sensordata

import akka.stream.scaladsl.RunnableGraph
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class MetricsValidation extends AkkaStreamlet {
  val in: CodecInlet[Metric]              = AvroInlet[Metric]("in")
  val invalid: CodecOutlet[InvalidMetric] = AvroOutlet[InvalidMetric]("invalid").withPartitioner(metric => metric.metric.deviceId.toString)
  val valid: CodecOutlet[Metric]          = AvroOutlet[Metric]("valid").withPartitioner(RoundRobinPartitioner)
  override val shape: StreamletShape      = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
    override def runnableGraph(): RunnableGraph[_] = sourceWithCommittableContext(in).to(Splitter.sink(flow, invalid, valid))
    def flow =
      FlowWithCommittableContext[Metric]()
        .map { metric =>
          if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
          else Right(metric)
        }
  }
}

In SensorDataUtils.scala, include the following code. A real validator would have more complex business logic, this is just for demonstration:

package sensordata

object SensorDataUtils {
  def isValidMetric(m: Metric) = m.value >= 0.0
}

Logging streamlets

Next, we want to log the valid and invalid metrics separately in 2 streamlets - valid-logger and invalid-logger.

In ValidMetricLogger.scala, include the following code:

package sensordata

import akka.stream.scaladsl._
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class ValidMetricLogger extends AkkaStreamlet {

  val inlet: CodecInlet[Metric]      = AvroInlet[Metric]("in")
  override val shape: StreamletShape = StreamletShape.withInlets(inlet)

  val LogLevel = RegExpConfigParameter(
    "log-level",
    "Provide one of the following log levels, debug, info, warning or error",
    "^debug|info|warning|error$",
    Some("debug")
  )

  val MsgPrefix = StringConfigParameter("msg-prefix", "Provide a prefix for the log lines", Some("valid-logger"))

  override def configParameters = Vector(LogLevel, MsgPrefix)

  override def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
    val logF: String => Unit = LogLevel.value.toLowerCase match {
      case "debug"   => system.log.debug _
      case "info"    => system.log.info _
      case "warning" => system.log.warning _
      case "error"   => system.log.error _
    }

    val msgPrefix = MsgPrefix.value

    def log(metric: Metric): Unit =
      logF(s"$msgPrefix $metric")

    def flow =
      FlowWithCommittableContext[Metric]()
        .map { validMetric =>
          log(validMetric)
          validMetric
        }

    override def runnableGraph(): RunnableGraph[_] =
      sourceWithCommittableContext(inlet).via(flow).to(committableSink)
  }
}

In InvalidMetricLogger.scala, include the following code:

package sensordata

import akka.stream.scaladsl._
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._

class InvalidMetricLogger extends AkkaStreamlet {
  val inlet: CodecInlet[InvalidMetric] = AvroInlet[InvalidMetric]("in")
  override val shape: StreamletShape   = StreamletShape.withInlets(inlet)

  override def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() {
    val flow = FlowWithCommittableContext[InvalidMetric]()
      .map { invalidMetric =>
        system.log.warning(s"Invalid metric detected! $invalidMetric")
        invalidMetric
      }

    override def runnableGraph(): RunnableGraph[_] =
      sourceWithCommittableContext(inlet).via(flow).to(committableSink)
  }
}

Process JSON records

Finally we need to put in place some support classes that we require to process JSON records through the Cloudflow pipeline.

In JsonFormats.scala, include the following code:

package sensordata

import java.time.Instant
import java.util.UUID

import scala.util.Try

import spray.json._

trait UUIDJsonSupport extends DefaultJsonProtocol {
  implicit object UUIDFormat extends JsonFormat[UUID] {
    def write(uuid: UUID) = JsString(uuid.toString)

    def read(json: JsValue): UUID = json match {
      case JsString(uuid) => Try(UUID.fromString(uuid)).getOrElse(deserializationError(s"Expected valid UUID but got '$uuid'."))
      case other          => deserializationError(s"Expected UUID as JsString, but got: $other")
    }
  }
}

trait InstantJsonSupport extends DefaultJsonProtocol {
  implicit object InstantFormat extends JsonFormat[Instant] {
    def write(instant: Instant) = JsNumber(instant.toEpochMilli)

    def read(json: JsValue): Instant = json match {
      case JsNumber(value) => Instant.ofEpochMilli(value.toLong)
      case other           => deserializationError(s"Expected Instant as JsNumber, but got: $other")
    }
  }
}

object MeasurementsJsonSupport extends DefaultJsonProtocol {
  implicit val measurementFormat = jsonFormat3(Measurements.apply)
}

object SensorDataJsonSupport extends DefaultJsonProtocol with UUIDJsonSupport with InstantJsonSupport {
  import MeasurementsJsonSupport._
  implicit val sensorDataFormat = jsonFormat3(SensorData.apply)
}

In package.scala, which should live outside of sensordata directory at one level above, include the following code:

import java.time.Instant

package object sensordata {
  implicit def toInstant(millis: Long): Instant = Instant.ofEpochMilli(millis)
}

What’s next

With the schema and streamlets ready, we proceed to wire the pipeline together by creating a blueprint.