Akka Streamlet utilities


Use case

An GrpcServerLogic can be used to handle gRPC requests. You need to extend your streamlet from AkkaServerStreamlet so that Cloudflow will expose an HTTP endpoint in Kubernetes.


gRPC is a contract-first technology, so you start by adding the protobuf definition for the service you want to expose to src/main/protobuf. In this example we will implement a simple request-response service:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "sensordata.grpc";
option java_outer_classname = "SensorDataProto";

package sensordata;

service SensorDataService {
    rpc Provide (SensorData) returns (SensorReply) {}

message SensorData {
    string payload = 1;

message SensorReply {
    string message = 1;

Because the Akka gRPC plugin is embedded in Cloudflow, the code for the service will automatically be generated when you compile this service. In this case, a sensordata.grpc.SensorDataService will be generated that you can implement.

To expose your implementation, extend AkkaServerStreamlet and use GrpcServerLogic to add your gRPC handlers:

import akka.grpc.scaladsl.ServerReflection
import cloudflow.akkastream.util.scaladsl.GrpcServerLogic
import sensordata.grpc.{ SensorData, SensorDataService, SensorDataServiceHandler }

class SensorDataIngress extends AkkaServerStreamlet {
  // ...

  override def createLogic = new GrpcServerLogic(this) {
    override def handlers() =
      List(SensorDataServiceHandler.partial(new SensorDataServiceImpl(sinkRef(out))), ServerReflection.partial(List(SensorDataService)))
import akka.grpc.javadsl.ServerReflection;
import cloudflow.akkastream.util.javadsl.GrpcServerLogic;
import sensordata.grpc.SensorDataServiceHandlerFactory;

public class SensorDataIngress extends AkkaServerStreamlet {
    // ...

    public AkkaStreamletLogic createLogic() {
        return new GrpcServerLogic(this, getContext()) {
            public List<Function<HttpRequest, CompletionStage<HttpResponse>>> handlers() {
                return Arrays.asList(
                        SensorDataServiceHandlerFactory.partial(new SensorDataServiceImpl(sinkRef(out)), SensorDataService.name, system()),
                        ServerReflection.create(Arrays.asList(SensorDataService.description), system()));

In this example, SensorDataServiceImpl is your implementation that extends the generated sensordata.grpc.SensorDataService. SensorDataServiceHandler(Factory) are also generated classes.

ServerReflection comes from the akka-grpc-runtime library and allows tools like grpcurl to use Server Reflection to discover your API’s dynamically.

External access

gRPC services use HTTP/2 as a transport mechanism. Cloudflow supports both HTTP and HTTP/2 on the port exposed by the pod.

There are 3 ways to use HTTP/2:

  • With TLS

  • Without TLS ('h2c'), using the Upgrade mechanism to negotiate between using HTTP/1 and HTTP/2

  • Without TLS ('h2c'), without negotiation (which assumes the client has prior knowledge that the server supports HTTP/2)

LoadBalancer Service

By creating a service of type LoadBalancer you can expose your services using a direct TCP loadbalancer. This means all features supported by the underlying implementation are available.

Limitations of this approach are that you are restricted to 1 service per IP, and more advanced features such as TLS termination and path-based routing are not available.

Nginx Ingress

The approach described in the Setting up external access section will technically work for gRPC services as well, but there are a number of caveats:

Nginx, by default, will only support h2c via the Upgrade mechanism. Because this mechanism is more complicated than the other 2 approaches, client support for this mechanism is not ubiquitous. Also, the loadbalancer will use HTTP/1 rather than HTTP/2 to the service. While this may happen to work it is not optimal.

GCE Ingress

gce (as used in the internal and external HTTP(S) load balancers on Google Kubernetes Engine) requires the target service to be of type NodePort. Because Cloudflow creates services of type ClusterIP, this ingress type cannot be used.