From 13914eb4a1b186349c6daebde2b22da64a53f2ee Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Fri, 15 Sep 2023 11:41:50 +1000 Subject: [PATCH] feat: propagate FTL headers through kotlin-runtime (#338) As part of this, switched to using plain gRPC for client calls (rather than okhttp). Also switched to using bit for builds in scripts. Bit has much better dependency specificiation than make so, for example, when repeatedly running integration tests, it won't constantly rebuild the JARs. ![echo-trace](https://github.com/TBD54566975/ftl/assets/41767/0dce3832-ba6f-434f-ba9f-9eb23a0dd8e0) --- Bitfile | 29 ++++++++--- backend/common/rpc/headers/headers.go | 2 - .../xyz/block/ftl/client/VerbServiceClient.kt | 9 ++-- .../main/kotlin/xyz/block/ftl/client/grpc.kt | 51 +++++++++++++------ .../main/kotlin/xyz/block/ftl/main/main.kt | 8 ++- .../kotlin/xyz/block/ftl/server/Server.kt | 7 +-- .../xyz/block/ftl/server/ServerInterceptor.kt | 35 +++++++++++++ scripts/ftl-dev | 2 +- scripts/ftl-run | 10 ++-- scripts/integration-tests | 17 +++---- 10 files changed, 113 insertions(+), 57 deletions(-) create mode 100644 kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt diff --git a/Bitfile b/Bitfile index c91a6b4c46..aba4688c01 100644 --- a/Bitfile +++ b/Bitfile @@ -9,6 +9,8 @@ SCHEMA_IN = backend/schema/schema.go backend/schema/protobuf.go \ SCHEMA_OUT = protos/xyz/block/ftl/v1/schema/schema.proto PROTO_IN = **/*.proto **/buf.* +# There's no real way to mechanically generate the list of outputs, so we just +# explicitly list them here. PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \ protos/xyz/block/ftl/v1/schema/schema.pb.go \ protos/xyz/block/ftl/v1/console/console.pb.go \ @@ -23,9 +25,12 @@ PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \ COMMON_LOG_IN = backend/common/log/api.go COMMON_LOG_OUT = backend/common/log/log_level_string.go -KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/**/pom.xml -KT_MVN_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar -KT_RUNTIME_OUT = build/template/ftl/jars/ftl-runtime.jar +KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml +KT_RUNTIME_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar +KT_RUNTIME_RUNNER_TEMPLATE_OUT = build/template/ftl/jars/ftl-runtime.jar + +KT_GENERATOR_IN = kotlin-runtime/ftl-generator/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml %{KT_RUNTIME_OUT} +KT_GENERATOR_OUT = kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar CLIENT_OUT = console/client/dist/index.html CLIENT_IN = console/client/src/**/* @@ -59,15 +64,23 @@ implicit %{RELEASE}/%{1}: cmd/* (cd backend/common/3rdparty/protos && buf generate) -clean -%{KT_MVN_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN} - build: mvn -pl :ftl-runtime package +%{KT_RUNTIME_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN} + build: + mvn -N install + mvn -pl :ftl-runtime install +clean: mvn -pl :ftl-runtime clean -%(dirname %{KT_RUNTIME_OUT})%: +%{KT_GENERATOR_OUT}: %{KT_GENERATOR_IN} + build: + mvn -N install + mvn -pl :ftl-generator install + +clean: mvn -pl :ftl-generator clean + +%(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})%: build: install -m 0700 -d %{OUT} -%{KT_RUNTIME_OUT}: %{KT_MVN_OUT} %(dirname %{KT_RUNTIME_OUT})% - build: install -m 0600 %{KT_MVN_OUT} %{OUT} +%{KT_RUNTIME_RUNNER_TEMPLATE_OUT}: %{KT_RUNTIME_OUT} %(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})% + build: install -m 0600 %{KT_RUNTIME_OUT} %{OUT} %{COMMON_LOG_OUT}: %{COMMON_LOG_IN} build: go generate %{IN} diff --git a/backend/common/rpc/headers/headers.go b/backend/common/rpc/headers/headers.go index 5f56b5d083..522d98ddb3 100644 --- a/backend/common/rpc/headers/headers.go +++ b/backend/common/rpc/headers/headers.go @@ -19,8 +19,6 @@ const ( VerbHeader = "FTL-Verb" // RequestIDHeader is the header used to pass the inbound request ID. RequestIDHeader = "FTL-Request-ID" - // RequestOriginHeader is the header used to pass the origin of the request. - RequestOriginHeader = "FTL-Request-Origin" ) func IsDirectRouted(header http.Header) bool { diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt index 197ba8fd5f..cae7d8fa8f 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt @@ -1,12 +1,11 @@ package xyz.block.ftl.client -import com.squareup.wire.GrpcClient import okio.ByteString.Companion.encodeUtf8 import xyz.block.ftl.Context import xyz.block.ftl.registry.Registry import xyz.block.ftl.registry.VerbRef import xyz.block.ftl.v1.CallRequest -import xyz.block.ftl.v1.VerbServiceClient as VerbServiceClientProto +import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub /** * Client for calling verbs. Concrete implementations of this interface may call via gRPC or directly. @@ -22,9 +21,7 @@ interface VerbServiceClient { fun call(context: Context, ref: VerbRef, req: String): String } -class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient { - val client = grpcClient.create(VerbServiceClientProto::class) - +class GrpcVerbServiceClient(val client: VerbServiceBlockingStub) : VerbServiceClient { override fun call(context: Context, ref: VerbRef, req: String): String { val request = CallRequest( verb = xyz.block.ftl.v1.schema.VerbRef( @@ -33,7 +30,7 @@ class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient { ), body = req.encodeUtf8(), ) - val response = client.Call().executeBlocking(request) + val response = client.Call(request) return when { response.error != null -> throw RuntimeException(response.error.message) response.body != null -> response.body.utf8() diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt index 7ae9805c76..75314d1a5d 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt @@ -1,20 +1,41 @@ package xyz.block.ftl.client -import com.squareup.wire.GrpcClient -import okhttp3.OkHttpClient -import okhttp3.Protocol -import java.time.Duration +import io.grpc.* +import io.grpc.netty.NettyChannelBuilder +import io.grpc.netty.NettyServerBuilder +import xyz.block.ftl.logging.Logging +import xyz.block.ftl.server.ServerInterceptor +import java.net.InetSocketAddress +import java.net.URL +import java.util.concurrent.TimeUnit.SECONDS -internal fun makeGrpcClient(endpoint: String): GrpcClient { - return GrpcClient.Builder() - .client( - OkHttpClient.Builder() - .readTimeout(Duration.ofSeconds(10)) - .writeTimeout(Duration.ofSeconds(10)) - .callTimeout(Duration.ofSeconds(10)) - .protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)) - .build() - ) - .baseUrl(endpoint) +internal fun makeGrpcClient(endpoint: String): ManagedChannel { + val url = URL(endpoint) + // TODO: Check if URL is https and use SSL? + return NettyChannelBuilder + .forAddress(InetSocketAddress(url.host, url.port)) + .keepAliveTime(5, SECONDS) + .intercept(VerbServiceClientInterceptor()) + .usePlaintext() .build() } + +private class VerbServiceClientInterceptor : ClientInterceptor { + override fun interceptCall( + method: MethodDescriptor?, + callOptions: CallOptions?, + next: Channel? + ): ClientCall { + val call = next?.newCall(method, callOptions) + return object : ForwardingClientCall.SimpleForwardingClientCall(call) { + override fun start(responseListener: Listener?, headers: Metadata?) { + ServerInterceptor.callers.get().forEach { caller -> + headers?.put(ServerInterceptor.callersMetadata, caller) + } + headers?.put(ServerInterceptor.requestIdMetadata, ServerInterceptor.requestId.get()) + super.start(responseListener, headers) + } + } + } + +} diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt index 9c856fd802..95c114ec74 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt @@ -1,10 +1,13 @@ package xyz.block.ftl.main +import io.grpc.ServerInterceptors import io.grpc.netty.NettyServerBuilder import xyz.block.ftl.client.GrpcVerbServiceClient import xyz.block.ftl.client.makeGrpcClient import xyz.block.ftl.registry.Registry import xyz.block.ftl.server.Server +import xyz.block.ftl.server.ServerInterceptor +import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub import java.net.InetSocketAddress import java.net.URL @@ -20,10 +23,11 @@ fun main() { println("Registered verb: ${verb.module}.${verb.name}") } val ftlEndpoint = System.getenv("FTL_ENDPOINT") ?: defaultFtlEndpoint - val verbRoutingClient = GrpcVerbServiceClient(makeGrpcClient(ftlEndpoint)) + val grpcClient = VerbServiceBlockingStub(makeGrpcClient(ftlEndpoint)) + val verbRoutingClient = GrpcVerbServiceClient(grpcClient) val server = Server(registry, verbRoutingClient) val grpcServer = NettyServerBuilder.forAddress(addr) - .addService(server) + .addService(ServerInterceptors.intercept(server, ServerInterceptor())) .build() grpcServer.start() grpcServer.awaitTermination() diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt index 401469cb22..daa17a8343 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt @@ -7,11 +7,7 @@ import xyz.block.ftl.client.VerbServiceClient import xyz.block.ftl.registry.Registry import xyz.block.ftl.registry.defaultJvmModuleName import xyz.block.ftl.registry.toModel -import xyz.block.ftl.v1.CallRequest -import xyz.block.ftl.v1.CallResponse -import xyz.block.ftl.v1.PingRequest -import xyz.block.ftl.v1.PingResponse -import xyz.block.ftl.v1.VerbServiceWireGrpc +import xyz.block.ftl.v1.* /** * FTL verb server. @@ -33,6 +29,7 @@ class Server( response.onError(IllegalArgumentException("verb is required")) return } + val out = registry.invoke( Context(jvmModule, routingClient), verbRef.toModel(), diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt new file mode 100644 index 0000000000..a08bc21bea --- /dev/null +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt @@ -0,0 +1,35 @@ +package xyz.block.ftl.server + +import io.grpc.* +import io.grpc.ServerInterceptor + +const val ftlVerbHeader = "FTL-Verb" +const val ftlRequestIdHeader = "FTL-Request-ID" + +internal class ServerInterceptor : ServerInterceptor { + + companion object { + internal var callersMetadata = Metadata.Key.of(ftlVerbHeader, Metadata.ASCII_STRING_MARSHALLER) + internal var requestIdMetadata = Metadata.Key.of(ftlRequestIdHeader, Metadata.ASCII_STRING_MARSHALLER) + + internal var callers = Context.key>(ftlVerbHeader) + internal var requestId = Context.key(ftlRequestIdHeader) + } + + override fun interceptCall( + call: ServerCall?, + headers: Metadata?, + next: ServerCallHandler? + ): ServerCall.Listener { + var context = Context.current() + + headers?.getAll(callersMetadata)?.apply { + context = context.withValue(callers, this.toList()) + } + headers?.get(requestIdMetadata)?.apply { + context = context.withValue(requestId, this) + } + + return Contexts.interceptCall(context, call, headers, next) + } +} diff --git a/scripts/ftl-dev b/scripts/ftl-dev index eb3441df5b..541800c91f 100755 --- a/scripts/ftl-dev +++ b/scripts/ftl-dev @@ -5,7 +5,7 @@ top="$(git rev-parse --show-toplevel)" cd "${top}" prepare_template() ( - make build/template/ftl/jars/ftl-runtime.jar + bit build/template/ftl/jars/ftl-runtime.jar ) prepare_template diff --git a/scripts/ftl-run b/scripts/ftl-run index 5b30538c7a..4da1ec7f85 100755 --- a/scripts/ftl-run +++ b/scripts/ftl-run @@ -4,17 +4,13 @@ set -euo pipefail top="$(git rev-parse --show-toplevel)" cd "${top}" -prepare_template() ( - make build/template/ftl/jars/ftl-runtime.jar -) build() { - make console/client/dist/index.html - go build -o build/release/ftl-controller -tags release ./cmd/ftl-controller - go build -o build/release/ftl-runner -tags release ./cmd/ftl-runner + bit build/template/ftl/jars/ftl-runtime.jar \ + build/release/ftl-controller \ + build/release/ftl-runner } -prepare_template build overmind start -f Procfile.nowatch diff --git a/scripts/integration-tests b/scripts/integration-tests index 3559d8613d..582513418d 100755 --- a/scripts/integration-tests +++ b/scripts/integration-tests @@ -13,8 +13,12 @@ error() { build_release() { info "Building release" - rm -rf build - make release + bit build/release/ftl-controller \ + build/release/ftl-runner \ + build/release/ftl \ + kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar \ + kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar \ + build/template/ftl/jars/ftl-runtime.jar } wipe_database() { @@ -41,14 +45,6 @@ start_cluster() { trap "overmind quit" EXIT INT TERM } -prepare_runner() ( - info "Preparing runner template directory" - mvn install - mkdir -p build/template/ftl/jars build/runner0 build/runner1 - test -r build/template/ftl/jars/ftl-runtime.jar && return 0 - make build/template/ftl/jars/ftl-runtime.jar -) - deploy_echo_kotlin() ( info "Deploying echo-kotlin" cd examples/echo-kotlin @@ -70,7 +66,6 @@ wait_for_deploys() { build_release wipe_database -prepare_runner start_cluster # Cluster is up, start interacting with it.