Streamlet Volume Mounts

Sometimes a streamlet needs to read and/or write files from/to some shared file system. Since streamlets run as processes on Kubernetes, they do not automatically have such a file system available. Cloudflow makes it possible for a streamlet to declare the need for a shared file system (e.g. a "volume" in Kubernetes terms) that should be mounted at a specific path. At deployment time the user can then indicate where that file system is actually located using a Kubernetes Persistent Volume Claim (PVC). Cloudflow will then make sure that the PVC will be mounted at the specified path at runtime and the streamlet can then treat it like a local file system.

The following example streamlet shows how to declare and use a volume mount:

/*
 * Copyright (C) 2016-2021 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cloudflow.akkastreamsdoc

import java.nio.file
import java.nio.file._

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl._
import akka.util.ByteString
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import spray.json.JsonParser

import scala.concurrent.Future
import scala.concurrent.duration._

class DataFileIngress extends AkkaStreamlet {

  import JsonSupport._

  val out   = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)

  private val sourceData = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)

  override def volumeMounts = Vector(sourceData)

  // Streamlet processing steps
  // 1. Every X seconds
  // 2. Enumerate all files in the mounted path
  // 3. Read each file *)
  // 4. Deserialize file content to a Data value *)

  // *) Note that reading and deserializing the file content is done in separate steps for readability only, in production they should be merged into one step for performance reasons.

  override def createLogic = new RunnableGraphStreamletLogic() {
    val listFiles: NotUsed => Source[file.Path, NotUsed] = { _ =>
      Directory.ls(getMountedPath(sourceData))
    }
    val readFile: Path => Source[ByteString, Future[IOResult]] = { path: Path =>
      FileIO.fromPath(path).via(JsonFraming.objectScanner(Int.MaxValue))
    }
    val parseFile: ByteString => Data = { jsonByteString =>
      JsonParser(jsonByteString.utf8String).convertTo[Data]
    }

    val emitFromFilesContinuously = Source
      .tick(1.second, 5.second, NotUsed)
      .flatMapConcat(listFiles)
      .flatMapConcat(readFile)
      .map(parseFile)
    def runnableGraph = emitFromFilesContinuously.to(plainSink(out))
  }
}

Java API

The Java API is slightly different from the Scala API. The example belows shows a streamlet that uses a read only volume mount.

/*
 * Copyright (C) 2016-2021 Lightbend Inc. <https://www.lightbend.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cloudflow.akkastreamsdoc;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import com.typesafe.config.Config;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.RunnableGraphStreamletLogic;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.AvroInlet;
import cloudflow.streamlets.avro.AvroOutlet;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class FilterStreamlet extends AkkaStreamlet {

    // Declare the volume mount
    private final VolumeMount referenceFiles = VolumeMount.createReadWriteMany("configuration", "/mnt/data");

    //tag::definition[]
    // Make the filter filename configurable
    private final StringConfigParameter filterFilenameConfig = StringConfigParameter.create(
                "filter-filename",
                "Name of the text file in the volume mount directory that contains the list of keys to filter out."
            ).withDefaultValue("device-ids.txt");
    //end::definition[]

    // Make polling interval configurable
    private final IntegerConfigParameter filterPollingInterval = IntegerConfigParameter.create(
                "filter-pollinginterval",
                "The interval in seconds the streamlet should check for updates to the filter file."
            ).withDefaultValue(10);

    @Override
    public VolumeMount[] defineVolumeMounts() {
        return new VolumeMount[] { referenceFiles };
    }

    @Override
    public ConfigParameter[] defineConfigParameters() {
        return new ConfigParameter[] { filterFilenameConfig, filterPollingInterval };
    }

    AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
    AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", Data.class);

    public StreamletShape shape() {
        return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
    }

    private Boolean findDeviceIdInFilterFile(String key, ArrayList<String> filterKeys) {
        for (String current : filterKeys) {
            if (current.equals(key)) {
                return false;
            }
        }
        return true;
    }

    public AkkaStreamletLogic createLogic() {
        return new RunnableGraphStreamletLogic(getContext()) {
            final Config streamletConfig = getStreamletConfig();
            final Path referenceFilesPath = getMountedPath(referenceFiles);

            //tag::usage[]
            final Path filterFilenamePath = Paths.get(referenceFilesPath.toString(),
                    filterFilenameConfig.getValue(getContext()));
            //end::usage[]

            final Duration pollingInterval = java.time.Duration.ofSeconds(filterPollingInterval.getValue(getContext()));

            final Source<ArrayList<String>, NotUsed> filterFileContent =
                DirectoryChangesSource
                .create(referenceFilesPath, pollingInterval, Integer.MAX_VALUE)
                .filter(changedFile ->
                    changedFile.second() != DirectoryChange.Deletion
                    &&
                    changedFile.first().equals(filterFilenamePath)
                )
                .map(Pair::first)
                .mapAsync(1, path ->
                    FileIO.fromPath(path)
                        .via(Framing.delimiter(ByteString.fromString("\n"), Integer.MAX_VALUE))
                        .runFold(
                            new ArrayList<String>(), (acc, entry) -> {
                                acc.addAll(Collections.singletonList(entry.utf8String()));
                                return acc;
                            },
                            system()
                        )
                );

            public RunnableGraph createRunnableGraph() {
                return getPlainSource(inlet)
                    .via(Flow.create())
                    .zipLatest(filterFileContent)
                    .filter(filterFileAndMetric ->
                        findDeviceIdInFilterFile(
                            filterFileAndMetric.first().getKey(),
                            filterFileAndMetric.second()
                        )
                    )
                    .map(Pair::first).to(getPlainSink(outlet));
            }
        };
    }
}

If you want to use a writable volume mount you can replace createReadOnlyMany with createReadWriteMany above.

Access Modes and PVC Mounting

The PVC associated with the streamlet volume mount is required to have the same access mode as the volume mount declared in the streamlet. When deploying the application the access mode will be checked, if the access mode differs from the access mode declared in the streamlet, the deployment of the application will fail.

The following access modes are available:

  • ReadOnlyMany: all streamlet instances get read-only access to the same volume.

  • ReadWriteMany: all streamlet instances get read and write access to the same volume.

Cluster Security Considerations

When deploying a Cloudflow application that contains streamlets with a volume mount, you may have to apply additional Kubernetes security configuration resources to the Kubernetes cluster for the application to deploy successfully.

The pod in which the streamlet is running may need to be associated with a Pod Security Context (PSP) or (on OpenShift) a Security Context Constraint (SCC).

This can be done by associating the Cloudflow application service account, called cloudflow-app-serviceaccount and located in the namespace of the application, with a PSP/SCC.

The PSP and SCC must allow the application pods to mount a writable volume as group id 185. This is the group id of the user running in the streamlet container.

Security context constraints example

The following example shows an SCC that would allow a Cloudflow application with a writable volume mount to deploy correctly to an Openshift cluster with an activated SCC controller. See the OpenShift documentation on Managing Security Context Constraints for more information.

kind: SecurityContextConstraints
apiVersion: v1
metadata:
  name: cloudflow-application-scc
allowPrivilegedContainer: true
runAsUser:
  type: MustRunAsNonRoot
seLinuxContext:
  type: RunAsAny
fsGroup:
  type: MustRunAs
  ranges:
  - min: 185
    max: 186
supplementalGroups:
  type: RunAsAny
volumes:
- '*'

Pod security policy example

This is an example of a PSP that would allow a Cloudflow application with a writable volume mount to deploy correctly.

apiVersion: extensions/v1beta1
kind: PodSecurityPolicy
metadata:
  name: cloudflow-volume-mount-psp
spec:
  runAsUser:
    rule: 'MustRunAsNonRoot'
  seLinux:
    rule: 'RunAsAny'
  supplementalGroups:
    rule: 'RunAsAny'
  seLinux:
    rule: 'RunAsAny'
  fsGroup:
    rule: 'MustRunAs'
    ranges:
    - min: 185
      max: 186
  volumes:
  - '*'

Deploying applications using volume mounts

When deploying a Cloudflow application with streamlets that use the volume mount feature, a Kubernetes Persistent Volume Claim (PVC) will need to be specified for each of the volume mounts.

Before the application can be deployed, the PVC needs to be created in the application namespace.

When the PVC has been created, you can deploy the application and associate the PVC with the streamlet volume mount name using a CLI flag.

Deploying an application without a required volume mount will fail and result in an error message:

$ kubectl cloudflow deploy sensor-data-java:427-a20fc62-dirty

[Error] The following volume mount needs to be bound to a Persistence Volume claim using the --volume-mount flag

- filter.configuration

To successfully deploy the application, the volume mount has to be bound to a PVC.

In the example below, the streamlet filter requires a volume mount called configuration. This volume mount is associated with the PVC named source-data-claim using the --volume-mount flag.

$ kubectl cloudflow deploy sensor-data-java:427-a20fc62-dirty --volume-mount filter.configuration=source-data-claim

The following volume mount is now bound to Persistent Volume Claim `source-data-claim`:

- filter.configuration

[Done] Deployment of application `sensor-data-java` has started.