Streamlet Volume Mounts
Sometimes a streamlet needs to read and/or write files from/to some shared file system. Since streamlets run as processes on Kubernetes, they do not automatically have such a file system available. Cloudflow makes it possible for a streamlet to declare the need for a shared file system (e.g. a "volume" in Kubernetes terms) that should be mounted at a specific path. At deployment time the user can then indicate where that file system is actually located using a Kubernetes Persistent Volume Claim (PVC). Cloudflow will then make sure that the PVC will be mounted at the specified path at runtime and the streamlet can then treat it like a local file system.
The following example streamlet shows how to declare and use a volume mount:
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cloudflow.akkastreamsdoc
import java.nio.file
import java.nio.file._
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl._
import akka.util.ByteString
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import spray.json.JsonParser
import scala.concurrent.Future
import scala.concurrent.duration._
class DataFileIngress extends AkkaStreamlet {
import JsonSupport._
val out = AvroOutlet[Data]("out").withPartitioner(RoundRobinPartitioner)
def shape = StreamletShape.withOutlets(out)
private val sourceData = VolumeMount("source-data-mount", "/mnt/data", ReadWriteMany)
override def volumeMounts = Vector(sourceData)
// Streamlet processing steps
// 1. Every X seconds
// 2. Enumerate all files in the mounted path
// 3. Read each file *)
// 4. Deserialize file content to a Data value *)
// *) Note that reading and deserializing the file content is done in separate steps for readability only, in production they should be merged into one step for performance reasons.
override def createLogic = new RunnableGraphStreamletLogic() {
val listFiles: NotUsed ⇒ Source[file.Path, NotUsed] = { _ ⇒
Directory.ls(getMountedPath(sourceData))
}
val readFile: Path ⇒ Source[ByteString, Future[IOResult]] = { path: Path ⇒
FileIO.fromPath(path).via(JsonFraming.objectScanner(Int.MaxValue))
}
val parseFile: ByteString ⇒ Data = { jsonByteString ⇒
JsonParser(jsonByteString.utf8String).convertTo[Data]
}
val emitFromFilesContinuously = Source
.tick(1.second, 5.second, NotUsed)
.flatMapConcat(listFiles)
.flatMapConcat(readFile)
.map(parseFile)
def runnableGraph = emitFromFilesContinuously.to(plainSink(out))
}
}
Java API
The Java API is slightly different from the Scala API. The example belows shows a streamlet that uses a read only volume mount.
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cloudflow.akkastreamsdoc;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import com.typesafe.config.Config;
import cloudflow.akkastream.*;
import cloudflow.akkastream.javadsl.RunnableGraphStreamletLogic;
import cloudflow.streamlets.*;
import cloudflow.streamlets.avro.AvroInlet;
import cloudflow.streamlets.avro.AvroOutlet;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class FilterStreamlet extends AkkaStreamlet {
// Declare the volume mount
private final VolumeMount referenceFiles = VolumeMount.createReadWriteMany("configuration", "/mnt/data");
//tag::definition[]
// Make the filter filename configurable
private final StringConfigParameter filterFilenameConfig = StringConfigParameter.create(
"filter-filename",
"Name of the text file in the volume mount directory that contains the list of keys to filter out."
).withDefaultValue("device-ids.txt");
//end::definition[]
// Make polling interval configurable
private final IntegerConfigParameter filterPollingInterval = IntegerConfigParameter.create(
"filter-pollinginterval",
"The interval in seconds the streamlet should check for updates to the filter file."
).withDefaultValue(10);
@Override
public VolumeMount[] defineVolumeMounts() {
return new VolumeMount[] { referenceFiles };
}
@Override
public ConfigParameter[] defineConfigParameters() {
return new ConfigParameter[] { filterFilenameConfig, filterPollingInterval };
}
AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", Data.class);
public StreamletShape shape() {
return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
}
private Boolean findDeviceIdInFilterFile(String key, ArrayList<String> filterKeys) {
for (String current : filterKeys) {
if (current.equals(key)) {
return false;
}
}
return true;
}
public AkkaStreamletLogic createLogic() {
return new RunnableGraphStreamletLogic(getContext()) {
final Config streamletConfig = getStreamletConfig();
final Path referenceFilesPath = getMountedPath(referenceFiles);
//tag::usage[]
final Path filterFilenamePath = Paths.get(referenceFilesPath.toString(),
filterFilenameConfig.getValue(getContext()));
//end::usage[]
final Duration pollingInterval = java.time.Duration.ofSeconds(filterPollingInterval.getValue(getContext()));
final Source<ArrayList<String>, NotUsed> filterFileContent =
DirectoryChangesSource
.create(referenceFilesPath, pollingInterval, Integer.MAX_VALUE)
.filter(changedFile ->
changedFile.second() != DirectoryChange.Deletion
&&
changedFile.first().equals(filterFilenamePath)
)
.map(Pair::first)
.mapAsync(1, path ->
FileIO.fromPath(path)
.via(Framing.delimiter(ByteString.fromString("\n"), Integer.MAX_VALUE))
.runFold(
new ArrayList<String>(), (acc, entry) -> {
acc.addAll(Collections.singletonList(entry.utf8String()));
return acc;
},
system()
)
);
public RunnableGraph createRunnableGraph() {
return getPlainSource(inlet)
.via(Flow.create())
.zipLatest(filterFileContent)
.filter(filterFileAndMetric ->
findDeviceIdInFilterFile(
filterFileAndMetric.first().getKey(),
filterFileAndMetric.second()
)
)
.map(Pair::first).to(getPlainSink(outlet));
}
};
}
}
If you want to use a writable volume mount you can replace createReadOnlyMany
with createReadWriteMany
above.
Access Modes and PVC Mounting
The PVC associated with the streamlet volume mount is required to have the same access mode as the volume mount declared in the streamlet. When deploying the application the access mode will be checked, if the access mode differs from the access mode declared in the streamlet, the deployment of the application will fail.
The following access modes are available:
-
ReadOnlyMany
: all streamlet instances get read-only access to the same volume. -
ReadWriteMany
: all streamlet instances get read and write access to the same volume.
Cluster Security Considerations
When deploying a Cloudflow application that contains streamlets with a volume mount, you may have to apply additional Kubernetes security configuration resources to the Kubernetes cluster for the application to deploy successfully.
The pod in which the streamlet is running may need to be associated with a Pod Security Context (PSP) or (on OpenShift) a Security Context Constraint (SCC).
This can be done by associating the Cloudflow application service account, called cloudflow-app-serviceaccount
and located in the namespace of the application, with a PSP/SCC.
The PSP and SCC must allow the application pods to mount a writable volume as group id 185
. This is the group id of the user running in the streamlet container.
Security context constraints example
The following example shows an SCC that would allow a Cloudflow application with a writable volume mount to deploy correctly to an Openshift cluster with an activated SCC controller. See the OpenShift documentation on Managing Security Context Constraints for more information.
kind: SecurityContextConstraints
apiVersion: v1
metadata:
name: cloudflow-application-scc
allowPrivilegedContainer: true
runAsUser:
type: MustRunAsNonRoot
seLinuxContext:
type: RunAsAny
fsGroup:
type: MustRunAs
ranges:
- min: 185
max: 186
supplementalGroups:
type: RunAsAny
volumes:
- '*'
Pod security policy example
This is an example of a PSP that would allow a Cloudflow application with a writable volume mount to deploy correctly.
apiVersion: extensions/v1beta1
kind: PodSecurityPolicy
metadata:
name: cloudflow-volume-mount-psp
spec:
runAsUser:
rule: 'MustRunAsNonRoot'
seLinux:
rule: 'RunAsAny'
supplementalGroups:
rule: 'RunAsAny'
seLinux:
rule: 'RunAsAny'
fsGroup:
rule: 'MustRunAs'
ranges:
- min: 185
max: 186
volumes:
- '*'
Deploying applications using volume mounts
When deploying a Cloudflow application with streamlets that use the volume mount feature, a Kubernetes Persistent Volume Claim (PVC) will need to be specified for each of the volume mounts.
Before the application can be deployed, the PVC needs to be created in the application namespace.
When the PVC has been created, you can deploy the application and associate the PVC with the streamlet volume mount name using a CLI flag.
Deploying an application without a required volume mount will fail and result in an error message:
$ kubectl cloudflow deploy sensor-data-java:427-a20fc62-dirty
[Error] The following volume mount needs to be bound to a Persistence Volume claim using the --volume-mount flag
- filter.configuration
To successfully deploy the application, the volume mount has to be bound to a PVC.
In the example below, the streamlet filter
requires a volume mount called configuration
. This volume mount is associated with the PVC named source-data-claim
using the --volume-mount
flag.
$ kubectl cloudflow deploy sensor-data-java:427-a20fc62-dirty --volume-mount filter.configuration=source-data-claim
The following volume mount is now bound to Persistent Volume Claim `source-data-claim`:
- filter.configuration
[Done] Deployment of application `sensor-data-java` has started.