diff --git a/Bitfile b/Bitfile index c91a6b4c46..aba4688c01 100644 --- a/Bitfile +++ b/Bitfile @@ -9,6 +9,8 @@ SCHEMA_IN = backend/schema/schema.go backend/schema/protobuf.go \ SCHEMA_OUT = protos/xyz/block/ftl/v1/schema/schema.proto PROTO_IN = **/*.proto **/buf.* +# There's no real way to mechanically generate the list of outputs, so we just +# explicitly list them here. PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \ protos/xyz/block/ftl/v1/schema/schema.pb.go \ protos/xyz/block/ftl/v1/console/console.pb.go \ @@ -23,9 +25,12 @@ PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \ COMMON_LOG_IN = backend/common/log/api.go COMMON_LOG_OUT = backend/common/log/log_level_string.go -KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/**/pom.xml -KT_MVN_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar -KT_RUNTIME_OUT = build/template/ftl/jars/ftl-runtime.jar +KT_RUNTIME_IN = kotlin-runtime/ftl-runtime/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml +KT_RUNTIME_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar +KT_RUNTIME_RUNNER_TEMPLATE_OUT = build/template/ftl/jars/ftl-runtime.jar + +KT_GENERATOR_IN = kotlin-runtime/ftl-generator/**/*.{kt,kts} pom.xml kotlin-runtime/ftl-runtime/**/pom.xml %{KT_RUNTIME_OUT} +KT_GENERATOR_OUT = kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar CLIENT_OUT = console/client/dist/index.html CLIENT_IN = console/client/src/**/* @@ -59,15 +64,23 @@ implicit %{RELEASE}/%{1}: cmd/* (cd backend/common/3rdparty/protos && buf generate) -clean -%{KT_MVN_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN} - build: mvn -pl :ftl-runtime package +%{KT_RUNTIME_OUT}: %{KT_RUNTIME_IN} %{PROTO_IN} + build: + mvn -N install + mvn -pl :ftl-runtime install +clean: mvn -pl :ftl-runtime clean -%(dirname %{KT_RUNTIME_OUT})%: +%{KT_GENERATOR_OUT}: %{KT_GENERATOR_IN} + build: + mvn -N install + mvn -pl :ftl-generator install + +clean: mvn -pl :ftl-generator clean + +%(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})%: build: install -m 0700 -d %{OUT} -%{KT_RUNTIME_OUT}: %{KT_MVN_OUT} %(dirname %{KT_RUNTIME_OUT})% - build: install -m 0600 %{KT_MVN_OUT} %{OUT} +%{KT_RUNTIME_RUNNER_TEMPLATE_OUT}: %{KT_RUNTIME_OUT} %(dirname %{KT_RUNTIME_RUNNER_TEMPLATE_OUT})% + build: install -m 0600 %{KT_RUNTIME_OUT} %{OUT} %{COMMON_LOG_OUT}: %{COMMON_LOG_IN} build: go generate %{IN} diff --git a/backend/common/rpc/headers/headers.go b/backend/common/rpc/headers/headers.go index 5f56b5d083..522d98ddb3 100644 --- a/backend/common/rpc/headers/headers.go +++ b/backend/common/rpc/headers/headers.go @@ -19,8 +19,6 @@ const ( VerbHeader = "FTL-Verb" // RequestIDHeader is the header used to pass the inbound request ID. RequestIDHeader = "FTL-Request-ID" - // RequestOriginHeader is the header used to pass the origin of the request. - RequestOriginHeader = "FTL-Request-Origin" ) func IsDirectRouted(header http.Header) bool { diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt index 197ba8fd5f..cae7d8fa8f 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/VerbServiceClient.kt @@ -1,12 +1,11 @@ package xyz.block.ftl.client -import com.squareup.wire.GrpcClient import okio.ByteString.Companion.encodeUtf8 import xyz.block.ftl.Context import xyz.block.ftl.registry.Registry import xyz.block.ftl.registry.VerbRef import xyz.block.ftl.v1.CallRequest -import xyz.block.ftl.v1.VerbServiceClient as VerbServiceClientProto +import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub /** * Client for calling verbs. Concrete implementations of this interface may call via gRPC or directly. @@ -22,9 +21,7 @@ interface VerbServiceClient { fun call(context: Context, ref: VerbRef, req: String): String } -class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient { - val client = grpcClient.create(VerbServiceClientProto::class) - +class GrpcVerbServiceClient(val client: VerbServiceBlockingStub) : VerbServiceClient { override fun call(context: Context, ref: VerbRef, req: String): String { val request = CallRequest( verb = xyz.block.ftl.v1.schema.VerbRef( @@ -33,7 +30,7 @@ class GrpcVerbServiceClient(grpcClient: GrpcClient) : VerbServiceClient { ), body = req.encodeUtf8(), ) - val response = client.Call().executeBlocking(request) + val response = client.Call(request) return when { response.error != null -> throw RuntimeException(response.error.message) response.body != null -> response.body.utf8() diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt index 7ae9805c76..75314d1a5d 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/client/grpc.kt @@ -1,20 +1,41 @@ package xyz.block.ftl.client -import com.squareup.wire.GrpcClient -import okhttp3.OkHttpClient -import okhttp3.Protocol -import java.time.Duration +import io.grpc.* +import io.grpc.netty.NettyChannelBuilder +import io.grpc.netty.NettyServerBuilder +import xyz.block.ftl.logging.Logging +import xyz.block.ftl.server.ServerInterceptor +import java.net.InetSocketAddress +import java.net.URL +import java.util.concurrent.TimeUnit.SECONDS -internal fun makeGrpcClient(endpoint: String): GrpcClient { - return GrpcClient.Builder() - .client( - OkHttpClient.Builder() - .readTimeout(Duration.ofSeconds(10)) - .writeTimeout(Duration.ofSeconds(10)) - .callTimeout(Duration.ofSeconds(10)) - .protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)) - .build() - ) - .baseUrl(endpoint) +internal fun makeGrpcClient(endpoint: String): ManagedChannel { + val url = URL(endpoint) + // TODO: Check if URL is https and use SSL? + return NettyChannelBuilder + .forAddress(InetSocketAddress(url.host, url.port)) + .keepAliveTime(5, SECONDS) + .intercept(VerbServiceClientInterceptor()) + .usePlaintext() .build() } + +private class VerbServiceClientInterceptor : ClientInterceptor { + override fun interceptCall( + method: MethodDescriptor?, + callOptions: CallOptions?, + next: Channel? + ): ClientCall { + val call = next?.newCall(method, callOptions) + return object : ForwardingClientCall.SimpleForwardingClientCall(call) { + override fun start(responseListener: Listener?, headers: Metadata?) { + ServerInterceptor.callers.get().forEach { caller -> + headers?.put(ServerInterceptor.callersMetadata, caller) + } + headers?.put(ServerInterceptor.requestIdMetadata, ServerInterceptor.requestId.get()) + super.start(responseListener, headers) + } + } + } + +} diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt index 9c856fd802..95c114ec74 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/main/main.kt @@ -1,10 +1,13 @@ package xyz.block.ftl.main +import io.grpc.ServerInterceptors import io.grpc.netty.NettyServerBuilder import xyz.block.ftl.client.GrpcVerbServiceClient import xyz.block.ftl.client.makeGrpcClient import xyz.block.ftl.registry.Registry import xyz.block.ftl.server.Server +import xyz.block.ftl.server.ServerInterceptor +import xyz.block.ftl.v1.VerbServiceWireGrpc.VerbServiceBlockingStub import java.net.InetSocketAddress import java.net.URL @@ -20,10 +23,11 @@ fun main() { println("Registered verb: ${verb.module}.${verb.name}") } val ftlEndpoint = System.getenv("FTL_ENDPOINT") ?: defaultFtlEndpoint - val verbRoutingClient = GrpcVerbServiceClient(makeGrpcClient(ftlEndpoint)) + val grpcClient = VerbServiceBlockingStub(makeGrpcClient(ftlEndpoint)) + val verbRoutingClient = GrpcVerbServiceClient(grpcClient) val server = Server(registry, verbRoutingClient) val grpcServer = NettyServerBuilder.forAddress(addr) - .addService(server) + .addService(ServerInterceptors.intercept(server, ServerInterceptor())) .build() grpcServer.start() grpcServer.awaitTermination() diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt index 401469cb22..daa17a8343 100644 --- a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/Server.kt @@ -7,11 +7,7 @@ import xyz.block.ftl.client.VerbServiceClient import xyz.block.ftl.registry.Registry import xyz.block.ftl.registry.defaultJvmModuleName import xyz.block.ftl.registry.toModel -import xyz.block.ftl.v1.CallRequest -import xyz.block.ftl.v1.CallResponse -import xyz.block.ftl.v1.PingRequest -import xyz.block.ftl.v1.PingResponse -import xyz.block.ftl.v1.VerbServiceWireGrpc +import xyz.block.ftl.v1.* /** * FTL verb server. @@ -33,6 +29,7 @@ class Server( response.onError(IllegalArgumentException("verb is required")) return } + val out = registry.invoke( Context(jvmModule, routingClient), verbRef.toModel(), diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt new file mode 100644 index 0000000000..a08bc21bea --- /dev/null +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/server/ServerInterceptor.kt @@ -0,0 +1,35 @@ +package xyz.block.ftl.server + +import io.grpc.* +import io.grpc.ServerInterceptor + +const val ftlVerbHeader = "FTL-Verb" +const val ftlRequestIdHeader = "FTL-Request-ID" + +internal class ServerInterceptor : ServerInterceptor { + + companion object { + internal var callersMetadata = Metadata.Key.of(ftlVerbHeader, Metadata.ASCII_STRING_MARSHALLER) + internal var requestIdMetadata = Metadata.Key.of(ftlRequestIdHeader, Metadata.ASCII_STRING_MARSHALLER) + + internal var callers = Context.key>(ftlVerbHeader) + internal var requestId = Context.key(ftlRequestIdHeader) + } + + override fun interceptCall( + call: ServerCall?, + headers: Metadata?, + next: ServerCallHandler? + ): ServerCall.Listener { + var context = Context.current() + + headers?.getAll(callersMetadata)?.apply { + context = context.withValue(callers, this.toList()) + } + headers?.get(requestIdMetadata)?.apply { + context = context.withValue(requestId, this) + } + + return Contexts.interceptCall(context, call, headers, next) + } +} diff --git a/scripts/ftl-dev b/scripts/ftl-dev index eb3441df5b..541800c91f 100755 --- a/scripts/ftl-dev +++ b/scripts/ftl-dev @@ -5,7 +5,7 @@ top="$(git rev-parse --show-toplevel)" cd "${top}" prepare_template() ( - make build/template/ftl/jars/ftl-runtime.jar + bit build/template/ftl/jars/ftl-runtime.jar ) prepare_template diff --git a/scripts/ftl-run b/scripts/ftl-run index 5b30538c7a..4da1ec7f85 100755 --- a/scripts/ftl-run +++ b/scripts/ftl-run @@ -4,17 +4,13 @@ set -euo pipefail top="$(git rev-parse --show-toplevel)" cd "${top}" -prepare_template() ( - make build/template/ftl/jars/ftl-runtime.jar -) build() { - make console/client/dist/index.html - go build -o build/release/ftl-controller -tags release ./cmd/ftl-controller - go build -o build/release/ftl-runner -tags release ./cmd/ftl-runner + bit build/template/ftl/jars/ftl-runtime.jar \ + build/release/ftl-controller \ + build/release/ftl-runner } -prepare_template build overmind start -f Procfile.nowatch diff --git a/scripts/integration-tests b/scripts/integration-tests index 3559d8613d..582513418d 100755 --- a/scripts/integration-tests +++ b/scripts/integration-tests @@ -13,8 +13,12 @@ error() { build_release() { info "Building release" - rm -rf build - make release + bit build/release/ftl-controller \ + build/release/ftl-runner \ + build/release/ftl \ + kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with-dependencies.jar \ + kotlin-runtime/ftl-generator/target/ftl-generator-1.0-SNAPSHOT-jar-with-dependencies.jar \ + build/template/ftl/jars/ftl-runtime.jar } wipe_database() { @@ -41,14 +45,6 @@ start_cluster() { trap "overmind quit" EXIT INT TERM } -prepare_runner() ( - info "Preparing runner template directory" - mvn install - mkdir -p build/template/ftl/jars build/runner0 build/runner1 - test -r build/template/ftl/jars/ftl-runtime.jar && return 0 - make build/template/ftl/jars/ftl-runtime.jar -) - deploy_echo_kotlin() ( info "Deploying echo-kotlin" cd examples/echo-kotlin @@ -70,7 +66,6 @@ wait_for_deploys() { build_release wipe_database -prepare_runner start_cluster # Cluster is up, start interacting with it.