Skip to content

Commit

Permalink
feat: propagate FTL headers through kotlin-runtime
Browse files Browse the repository at this point in the history
There's some weirdness at play, in that the client uses okhttp3, and the
server uses grpc.io. This appears to be how Wire works, but there's
basically zero documentation so I can't be sure.

Also switched to using bit for builds in scripts. Bit has much better
dependency specificiation than make so, for example, when repeatedly
running integration tests, it won't constantly rebuild the JARs.
  • Loading branch information
alecthomas committed Sep 15, 2023
1 parent ddb2c6e commit 2b897ea
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 57 deletions.
29 changes: 21 additions & 8 deletions Bitfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ SCHEMA_IN = backend/schema/schema.go backend/schema/protobuf.go \
SCHEMA_OUT = protos/xyz/block/ftl/v1/schema/schema.proto

PROTO_IN = **/*.proto **/buf.*
# There's no real way to mechanically generate the list of outputs, so we just
# explicitly list them here.
PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \
protos/xyz/block/ftl/v1/schema/schema.pb.go \
protos/xyz/block/ftl/v1/console/console.pb.go \
Expand All @@ -23,9 +25,12 @@ PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \
COMMON_LOG_IN = backend/common/log/api.go
COMMON_LOG_OUT = backend/common/log/log_level_string.go

KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/**/pom.xml
KT_MVN_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar
KT_RUNTIME_OUT = build/template/ftl/jars/ftl-runtime.jar
KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml
KT_RUNTIME_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar
KT_RUNTIME_RUNNER_TEMPLATE_OUT = build/template/ftl/jars/ftl-runtime.jar

KT_GENERATOR_IN = kotlin-runtime/ftl-generator/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml %{KT_RUNTIME_OUT}
KT_GENERATOR_OUT = kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

CLIENT_OUT = console/client/dist/index.html
CLIENT_IN = console/client/src/**/*
Expand Down Expand Up @@ -59,15 +64,23 @@ implicit %{RELEASE}/%{1}: cmd/*
(cd backend/common/3rdparty/protos && buf generate)
-clean

%{KT_MVN_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN}
build: mvn -pl :ftl-runtime package
%{KT_RUNTIME_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN}
build:
mvn -N install
mvn -pl :ftl-runtime install
+clean: mvn -pl :ftl-runtime clean

%(dirname %{KT_RUNTIME_OUT})%:
%{KT_GENERATOR_OUT}: %{KT_GENERATOR_IN}
build:
mvn -N install
mvn -pl :ftl-generator install
+clean: mvn -pl :ftl-generator clean

%(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})%:
build: install -m 0700 -d %{OUT}

%{KT_RUNTIME_OUT}: %{KT_MVN_OUT} %(dirname %{KT_RUNTIME_OUT})%
build: install -m 0600 %{KT_MVN_OUT} %{OUT}
%{KT_RUNTIME_RUNNER_TEMPLATE_OUT}: %{KT_RUNTIME_OUT} %(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})%
build: install -m 0600 %{KT_RUNTIME_OUT} %{OUT}

%{COMMON_LOG_OUT}: %{COMMON_LOG_IN}
build: go generate %{IN}
Expand Down
2 changes: 0 additions & 2 deletions backend/common/rpc/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
VerbHeader = "FTL-Verb"
// RequestIDHeader is the header used to pass the inbound request ID.
RequestIDHeader = "FTL-Request-ID"
// RequestOriginHeader is the header used to pass the origin of the request.
RequestOriginHeader = "FTL-Request-Origin"
)

func IsDirectRouted(header http.Header) bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package xyz.block.ftl.client

import com.squareup.wire.GrpcClient
import okio.ByteString.Companion.encodeUtf8
import xyz.block.ftl.Context
import xyz.block.ftl.registry.Registry
import xyz.block.ftl.registry.VerbRef
import xyz.block.ftl.v1.CallRequest
import xyz.block.ftl.v1.VerbServiceClient as VerbServiceClientProto
import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub

/**
* Client for calling verbs. Concrete implementations of this interface may call via gRPC or directly.
Expand All @@ -22,9 +21,7 @@ interface VerbServiceClient {
fun call(context: Context, ref: VerbRef, req: String): String
}

class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient {
val client = grpcClient.create(VerbServiceClientProto::class)

class GrpcVerbServiceClient(val client: VerbServiceBlockingStub) : VerbServiceClient {
override fun call(context: Context, ref: VerbRef, req: String): String {
val request = CallRequest(
verb = xyz.block.ftl.v1.schema.VerbRef(
Expand All @@ -33,7 +30,7 @@ class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient {
),
body = req.encodeUtf8(),
)
val response = client.Call().executeBlocking(request)
val response = client.Call(request)
return when {
response.error != null -> throw RuntimeException(response.error.message)
response.body != null -> response.body.utf8()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package xyz.block.ftl.client

import com.squareup.wire.GrpcClient
import okhttp3.OkHttpClient
import okhttp3.Protocol
import java.time.Duration
import io.grpc.*
import io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.NettyServerBuilder
import xyz.block.ftl.logging.Logging
import xyz.block.ftl.server.ServerInterceptor
import java.net.InetSocketAddress
import java.net.URL
import java.util.concurrent.TimeUnit.SECONDS

internal fun makeGrpcClient(endpoint: String): GrpcClient {
return GrpcClient.Builder()
.client(
OkHttpClient.Builder()
.readTimeout(Duration.ofSeconds(10))
.writeTimeout(Duration.ofSeconds(10))
.callTimeout(Duration.ofSeconds(10))
.protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE))
.build()
)
.baseUrl(endpoint)
internal fun makeGrpcClient(endpoint: String): ManagedChannel {
val url = URL(endpoint)
// TODO: Check if URL is https and use SSL?
return NettyChannelBuilder
.forAddress(InetSocketAddress(url.host, url.port))
.keepAliveTime(5, SECONDS)
.intercept(VerbServiceClientInterceptor())
.usePlaintext()
.build()
}

private class VerbServiceClientInterceptor : ClientInterceptor {
override fun <ReqT : Any?, RespT : Any?> interceptCall(
method: MethodDescriptor<ReqT, RespT>?,
callOptions: CallOptions?,
next: Channel?
): ClientCall<ReqT, RespT> {
val call = next?.newCall(method, callOptions)
return object : ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
override fun start(responseListener: Listener<RespT>?, headers: Metadata?) {
ServerInterceptor.callers.get().forEach { caller ->
headers?.put(ServerInterceptor.callersMetadata, caller)
}
headers?.put(ServerInterceptor.requestIdMetadata, ServerInterceptor.requestId.get())
super.start(responseListener, headers)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package xyz.block.ftl.main

import io.grpc.ServerInterceptors
import io.grpc.netty.NettyServerBuilder
import xyz.block.ftl.client.GrpcVerbServiceClient
import xyz.block.ftl.client.makeGrpcClient
import xyz.block.ftl.registry.Registry
import xyz.block.ftl.server.Server
import xyz.block.ftl.server.ServerInterceptor
import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub
import java.net.InetSocketAddress
import java.net.URL

Expand All @@ -20,10 +23,11 @@ fun main() {
println("Registered verb: ${verb.module}.${verb.name}")
}
val ftlEndpoint = System.getenv("FTL_ENDPOINT") ?: defaultFtlEndpoint
val verbRoutingClient = GrpcVerbServiceClient(makeGrpcClient(ftlEndpoint))
val grpcClient = VerbServiceBlockingStub(makeGrpcClient(ftlEndpoint))
val verbRoutingClient = GrpcVerbServiceClient(grpcClient)
val server = Server(registry, verbRoutingClient)
val grpcServer = NettyServerBuilder.forAddress(addr)
.addService(server)
.addService(ServerInterceptors.intercept(server, ServerInterceptor()))
.build()
grpcServer.start()
grpcServer.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import xyz.block.ftl.client.VerbServiceClient
import xyz.block.ftl.registry.Registry
import xyz.block.ftl.registry.defaultJvmModuleName
import xyz.block.ftl.registry.toModel
import xyz.block.ftl.v1.CallRequest
import xyz.block.ftl.v1.CallResponse
import xyz.block.ftl.v1.PingRequest
import xyz.block.ftl.v1.PingResponse
import xyz.block.ftl.v1.VerbServiceWireGrpc
import xyz.block.ftl.v1.*

/**
* FTL verb server.
Expand All @@ -33,6 +29,7 @@ class Server(
response.onError(IllegalArgumentException("verb is required"))
return
}

val out = registry.invoke(
Context(jvmModule, routingClient),
verbRef.toModel(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package xyz.block.ftl.server

import io.grpc.*
import io.grpc.ServerInterceptor

const val ftlVerbHeader = "FTL-Verb"
const val ftlRequestIdHeader = "FTL-Request-ID"

internal class ServerInterceptor : ServerInterceptor {

companion object {
internal var callersMetadata = Metadata.Key.of(ftlVerbHeader, Metadata.ASCII_STRING_MARSHALLER)
internal var requestIdMetadata = Metadata.Key.of(ftlRequestIdHeader, Metadata.ASCII_STRING_MARSHALLER)

internal var callers = Context.key<List<String>>(ftlVerbHeader)
internal var requestId = Context.key<String>(ftlRequestIdHeader)
}

override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>?,
headers: Metadata?,
next: ServerCallHandler<ReqT, RespT>?
): ServerCall.Listener<ReqT> {
var context = Context.current()

headers?.getAll(callersMetadata)?.apply {
context = context.withValue(callers, this.toList())
}
headers?.get(requestIdMetadata)?.apply {
context = context.withValue(requestId, this)
}

return Contexts.interceptCall(context, call, headers, next)
}
}
2 changes: 1 addition & 1 deletion scripts/ftl-dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ top="$(git rev-parse --show-toplevel)"
cd "${top}"

prepare_template() (
make build/template/ftl/jars/ftl-runtime.jar
bit build/template/ftl/jars/ftl-runtime.jar
)

prepare_template
Expand Down
10 changes: 3 additions & 7 deletions scripts/ftl-run
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ set -euo pipefail
top="$(git rev-parse --show-toplevel)"
cd "${top}"

prepare_template() (
make build/template/ftl/jars/ftl-runtime.jar
)

build() {
make console/client/dist/index.html
go build -o build/release/ftl-controller -tags release ./cmd/ftl-controller
go build -o build/release/ftl-runner -tags release ./cmd/ftl-runner
bit build/template/ftl/jars/ftl-runtime.jar \
build/release/ftl-controller \
build/release/ftl-runner
}

prepare_template
build

overmind start -f Procfile.nowatch
17 changes: 6 additions & 11 deletions scripts/integration-tests
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ error() {

build_release() {
info "Building release"
rm -rf build
make release
bit build/release/ftl-controller \
build/release/ftl-runner \
build/release/ftl \
kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar \
build/template/ftl/jars/ftl-runtime.jar
}

wipe_database() {
Expand All @@ -41,14 +45,6 @@ start_cluster() {
trap "overmind quit" EXIT INT TERM
}

prepare_runner() (
info "Preparing runner template directory"
mvn install
mkdir -p build/template/ftl/jars build/runner0 build/runner1
test -r build/template/ftl/jars/ftl-runtime.jar && return 0
make build/template/ftl/jars/ftl-runtime.jar
)

deploy_echo_kotlin() (
info "Deploying echo-kotlin"
cd examples/echo-kotlin
Expand All @@ -70,7 +66,6 @@ wait_for_deploys() {

build_release
wipe_database
prepare_runner
start_cluster

# Cluster is up, start interacting with it.
Expand Down

0 comments on commit 2b897ea

Please sign in to comment.