The verb return type
+ */
+public interface VerbClient {
+
+ R call(P param);
+
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientDefinition.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientDefinition.java
new file mode 100644
index 0000000000..1e258f7fa6
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientDefinition.java
@@ -0,0 +1,18 @@
+package xyz.block.ftl;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation that is used to define a verb client
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface VerbClientDefinition {
+
+ String module() default "";
+
+ String name();
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientEmpty.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientEmpty.java
new file mode 100644
index 0000000000..2d68c8d88d
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientEmpty.java
@@ -0,0 +1,5 @@
+package xyz.block.ftl;
+
+public interface VerbClientEmpty {
+ void call();
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSink.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSink.java
new file mode 100644
index 0000000000..cb05af1038
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSink.java
@@ -0,0 +1,5 @@
+package xyz.block.ftl;
+
+public interface VerbClientSink
{
+ void call(P param);
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSource.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSource.java
new file mode 100644
index 0000000000..95efc04c78
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbClientSource.java
@@ -0,0 +1,5 @@
+package xyz.block.ftl;
+
+public interface VerbClientSource {
+ R call();
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbName.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbName.java
new file mode 100644
index 0000000000..1a9a7e2548
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/VerbName.java
@@ -0,0 +1,12 @@
+package xyz.block.ftl;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Used to override the name of a verb. Without this annotation it defaults to the method name.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface VerbName {
+ String value();
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLConfigSource.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLConfigSource.java
new file mode 100644
index 0000000000..15ff77949d
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLConfigSource.java
@@ -0,0 +1,65 @@
+package xyz.block.ftl.runtime;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Set;
+
+import org.eclipse.microprofile.config.spi.ConfigSource;
+
+public class FTLConfigSource implements ConfigSource {
+
+ final static String SEPARATE_SERVER = "quarkus.grpc.server.use-separate-server";
+ final static String PORT = "quarkus.http.port";
+ final static String HOST = "quarkus.http.host";
+
+ final static String FTL_BIND = "FTL_BIND";
+
+ @Override
+ public Set getPropertyNames() {
+ return Set.of(SEPARATE_SERVER, PORT, HOST);
+ }
+
+ @Override
+ public int getOrdinal() {
+ return 1;
+ }
+
+ @Override
+ public String getValue(String s) {
+ switch (s) {
+ case SEPARATE_SERVER -> {
+ return "false";
+ }
+ case PORT -> {
+ String bind = System.getenv(FTL_BIND);
+ if (bind == null) {
+ return null;
+ }
+ try {
+ URI uri = new URI(bind);
+ return Integer.toString(uri.getPort());
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+ case HOST -> {
+ String bind = System.getenv(FTL_BIND);
+ if (bind == null) {
+ return null;
+ }
+ try {
+ URI uri = new URI(bind);
+ return uri.getHost();
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return "FTL Config";
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java
new file mode 100644
index 0000000000..28b65e3cd9
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java
@@ -0,0 +1,182 @@
+package xyz.block.ftl.runtime;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import jakarta.inject.Singleton;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import io.quarkus.runtime.Startup;
+import xyz.block.ftl.v1.CallRequest;
+import xyz.block.ftl.v1.CallResponse;
+import xyz.block.ftl.v1.ModuleContextRequest;
+import xyz.block.ftl.v1.ModuleContextResponse;
+import xyz.block.ftl.v1.PublishEventRequest;
+import xyz.block.ftl.v1.PublishEventResponse;
+import xyz.block.ftl.v1.VerbServiceGrpc;
+import xyz.block.ftl.v1.schema.Ref;
+
+@Singleton
+@Startup
+public class FTLController {
+ private static final Logger log = Logger.getLogger(FTLController.class);
+ final String moduleName;
+
+ private Throwable currentError;
+ private volatile ModuleContextResponse moduleContextResponse;
+ private boolean waiters = false;
+
+ final VerbServiceGrpc.VerbServiceStub verbService;
+ final StreamObserver moduleObserver = new StreamObserver<>() {
+ @Override
+ public void onNext(ModuleContextResponse moduleContextResponse) {
+ synchronized (this) {
+ currentError = null;
+ FTLController.this.moduleContextResponse = moduleContextResponse;
+ if (waiters) {
+ this.notifyAll();
+ waiters = false;
+ }
+ }
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.error("GRPC connection error", throwable);
+ synchronized (this) {
+ currentError = throwable;
+ if (waiters) {
+ this.notifyAll();
+ waiters = false;
+ }
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
+ }
+ };
+
+ public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http://localhost:8892") URI uri,
+ @ConfigProperty(name = "ftl.module.name") String moduleName) {
+ this.moduleName = moduleName;
+ var channelBuilder = ManagedChannelBuilder.forAddress(uri.getHost(), uri.getPort());
+ if (uri.getScheme().equals("http")) {
+ channelBuilder.usePlaintext();
+ }
+ var channel = channelBuilder.build();
+ verbService = VerbServiceGrpc.newStub(channel);
+ verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
+
+ }
+
+ public byte[] getSecret(String secretName) {
+ var context = getModuleContext();
+ if (context.containsSecrets(secretName)) {
+ return context.getSecretsMap().get(secretName).toByteArray();
+ }
+ throw new RuntimeException("Secret not found: " + secretName);
+ }
+
+ public byte[] getConfig(String secretName) {
+ var context = getModuleContext();
+ if (context.containsConfigs(secretName)) {
+ return context.getConfigsMap().get(secretName).toByteArray();
+ }
+ throw new RuntimeException("Config not found: " + secretName);
+ }
+
+ public byte[] callVerb(String name, String module, byte[] payload) {
+ CompletableFuture cf = new CompletableFuture<>();
+ verbService.call(CallRequest.newBuilder().setVerb(Ref.newBuilder().setModule(module).setName(name))
+ .setBody(ByteString.copyFrom(payload)).build(), new StreamObserver<>() {
+
+ @Override
+ public void onNext(CallResponse callResponse) {
+ if (callResponse.hasError()) {
+ cf.completeExceptionally(new RuntimeException(callResponse.getError().getMessage()));
+ } else {
+ cf.complete(callResponse.getBody().toByteArray());
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ cf.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ try {
+ return cf.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void publishEvent(String topic, String callingVerbName, byte[] event) {
+ CompletableFuture> cf = new CompletableFuture<>();
+ verbService.publishEvent(PublishEventRequest.newBuilder()
+ .setCaller(callingVerbName).setBody(ByteString.copyFrom(event))
+ .setTopic(Ref.newBuilder().setModule(moduleName).setName(topic).build()).build(),
+ new StreamObserver() {
+ @Override
+ public void onNext(PublishEventResponse publishEventResponse) {
+ cf.complete(null);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ cf.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ cf.complete(null);
+ }
+ });
+ try {
+ cf.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ModuleContextResponse getModuleContext() {
+ var moduleContext = moduleContextResponse;
+ if (moduleContext != null) {
+ return moduleContext;
+ }
+ synchronized (moduleObserver) {
+ for (;;) {
+ moduleContext = moduleContextResponse;
+ if (moduleContext != null) {
+ return moduleContext;
+ }
+ if (currentError != null) {
+ throw new RuntimeException(currentError);
+ }
+ waiters = true;
+ try {
+ moduleObserver.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+ }
+
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLHttpHandler.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLHttpHandler.java
new file mode 100644
index 0000000000..48596bffe5
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLHttpHandler.java
@@ -0,0 +1,245 @@
+package xyz.block.ftl.runtime;
+
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+import jakarta.inject.Singleton;
+import jakarta.ws.rs.core.MediaType;
+
+import org.jboss.logging.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.http.*;
+import io.netty.util.ReferenceCountUtil;
+import io.quarkus.netty.runtime.virtual.VirtualClientConnection;
+import io.quarkus.netty.runtime.virtual.VirtualResponseHandler;
+import io.quarkus.vertx.http.runtime.QuarkusHttpHeaders;
+import io.quarkus.vertx.http.runtime.VertxHttpRecorder;
+import xyz.block.ftl.v1.CallRequest;
+import xyz.block.ftl.v1.CallResponse;
+
+@SuppressWarnings("unused")
+@Singleton
+public class FTLHttpHandler implements VerbInvoker {
+
+ public static final String CONTENT_TYPE = "Content-Type";
+ final ObjectMapper mapper;
+ private static final Logger log = Logger.getLogger("quarkus.amazon.lambda.http");
+
+ private static final int BUFFER_SIZE = 8096;
+
+ private static final Map> ERROR_HEADERS = Map.of();
+
+ private static final String COOKIE_HEADER = "Cookie";
+
+ // comma headers for headers that have comma in value and we don't want to split it up into
+ // multiple headers
+ private static final Set COMMA_HEADERS = Set.of("access-control-request-headers");
+
+ public FTLHttpHandler(ObjectMapper mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public CallResponse handle(CallRequest in) {
+ try {
+ var body = mapper.createParser(in.getBody().newInput())
+ .readValueAs(xyz.block.ftl.runtime.builtin.HttpRequest.class);
+ body.getHeaders().put(FTLRecorder.X_FTL_VERB, List.of(in.getVerb().getName()));
+ var ret = handleRequest(body);
+ var mappedResponse = mapper.writer().writeValueAsBytes(ret);
+ return CallResponse.newBuilder().setBody(ByteString.copyFrom(mappedResponse)).build();
+ } catch (Exception e) {
+ return CallResponse.newBuilder().setError(CallResponse.Error.newBuilder().setMessage(e.getMessage()).build())
+ .build();
+ }
+
+ }
+
+ public xyz.block.ftl.runtime.builtin.HttpResponse handleRequest(xyz.block.ftl.runtime.builtin.HttpRequest request) {
+ InetSocketAddress clientAddress = null;
+ try {
+ return nettyDispatch(clientAddress, request);
+ } catch (Exception e) {
+ log.error("Request Failure", e);
+ xyz.block.ftl.runtime.builtin.HttpResponse res = new xyz.block.ftl.runtime.builtin.HttpResponse();
+ res.setStatus(500);
+ res.setError(e);
+ res.setHeaders(ERROR_HEADERS);
+ return res;
+ }
+
+ }
+
+ private class NettyResponseHandler implements VirtualResponseHandler {
+ xyz.block.ftl.runtime.builtin.HttpResponse responseBuilder = new xyz.block.ftl.runtime.builtin.HttpResponse();
+ ByteArrayOutputStream baos;
+ WritableByteChannel byteChannel;
+ final xyz.block.ftl.runtime.builtin.HttpRequest request;
+ CompletableFuture future = new CompletableFuture<>();
+
+ public NettyResponseHandler(xyz.block.ftl.runtime.builtin.HttpRequest request) {
+ this.request = request;
+ }
+
+ public CompletableFuture getFuture() {
+ return future;
+ }
+
+ @Override
+ public void handleMessage(Object msg) {
+ try {
+ //log.info("Got message: " + msg.getClass().getName());
+
+ if (msg instanceof HttpResponse) {
+ HttpResponse res = (HttpResponse) msg;
+ responseBuilder.setStatus(res.status().code());
+
+ final Map> headers = new HashMap<>();
+ responseBuilder.setHeaders(headers);
+ for (String name : res.headers().names()) {
+ final List allForName = res.headers().getAll(name);
+ if (allForName == null || allForName.isEmpty()) {
+ continue;
+ }
+ headers.put(name, allForName);
+ }
+ }
+ if (msg instanceof HttpContent) {
+ HttpContent content = (HttpContent) msg;
+ int readable = content.content().readableBytes();
+ if (baos == null && readable > 0) {
+ baos = createByteStream();
+ }
+ for (int i = 0; i < readable; i++) {
+ baos.write(content.content().readByte());
+ }
+ }
+ if (msg instanceof FileRegion) {
+ FileRegion file = (FileRegion) msg;
+ if (file.count() > 0 && file.transferred() < file.count()) {
+ if (baos == null)
+ baos = createByteStream();
+ if (byteChannel == null)
+ byteChannel = Channels.newChannel(baos);
+ file.transferTo(byteChannel, file.transferred());
+ }
+ }
+ if (msg instanceof LastHttpContent) {
+ if (baos != null) {
+ List ct = responseBuilder.getHeaders().get(CONTENT_TYPE);
+ if (ct == null || ct.isEmpty()) {
+ //TODO: how to handle this
+ responseBuilder.setBody(baos.toString(StandardCharsets.UTF_8));
+ } else if (ct.get(0).contains(MediaType.TEXT_PLAIN)) {
+ // need to encode as JSON string
+ responseBuilder.setBody(mapper.writer().writeValueAsString(baos.toString(StandardCharsets.UTF_8)));
+ } else {
+ responseBuilder.setBody(baos.toString(StandardCharsets.UTF_8));
+ }
+ }
+ future.complete(responseBuilder);
+ }
+ } catch (Throwable ex) {
+ future.completeExceptionally(ex);
+ } finally {
+ if (msg != null) {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!future.isDone())
+ future.completeExceptionally(new RuntimeException("Connection closed"));
+ }
+ }
+
+ private xyz.block.ftl.runtime.builtin.HttpResponse nettyDispatch(InetSocketAddress clientAddress,
+ xyz.block.ftl.runtime.builtin.HttpRequest request)
+ throws Exception {
+ QuarkusHttpHeaders quarkusHeaders = new QuarkusHttpHeaders();
+ quarkusHeaders.setContextObject(xyz.block.ftl.runtime.builtin.HttpRequest.class, request);
+ HttpMethod httpMethod = HttpMethod.valueOf(request.getMethod());
+ if (httpMethod == null) {
+ throw new IllegalStateException("Missing HTTP method in request event");
+ }
+ //TODO: encoding schenanigans
+ StringBuilder path = new StringBuilder(request.getPath());
+ if (request.getQuery() != null && !request.getQuery().isEmpty()) {
+ path.append("?");
+ var first = true;
+ for (var entry : request.getQuery().entrySet()) {
+ for (var val : entry.getValue()) {
+ if (first) {
+ first = false;
+ } else {
+ path.append("&");
+ }
+ path.append(entry.getKey()).append("=").append(val);
+ }
+ }
+ }
+ DefaultHttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
+ httpMethod, path.toString(), quarkusHeaders);
+ if (request.getHeaders() != null) {
+ for (Map.Entry> header : request.getHeaders().entrySet()) {
+ if (header.getValue() != null) {
+ for (String val : header.getValue()) {
+ nettyRequest.headers().add(header.getKey(), val);
+ }
+ }
+ }
+ }
+ nettyRequest.headers().add(CONTENT_TYPE, MediaType.APPLICATION_JSON);
+
+ if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) {
+ nettyRequest.headers().add(HttpHeaderNames.HOST, "localhost");
+ }
+
+ HttpContent requestContent = LastHttpContent.EMPTY_LAST_CONTENT;
+ if (request.getBody() != null) {
+ // See https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
+ nettyRequest.headers().add(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
+ ByteBuf body = Unpooled.copiedBuffer(request.getBody().toString(), StandardCharsets.UTF_8); //TODO: do we need to look at the request encoding?
+ requestContent = new DefaultLastHttpContent(body);
+ }
+ NettyResponseHandler handler = new NettyResponseHandler(request);
+ VirtualClientConnection connection = VirtualClientConnection.connect(handler, VertxHttpRecorder.VIRTUAL_HTTP,
+ clientAddress);
+
+ connection.sendMessage(nettyRequest);
+ connection.sendMessage(requestContent);
+ try {
+ return handler.getFuture().get();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private ByteArrayOutputStream createByteStream() {
+ ByteArrayOutputStream baos;
+ baos = new ByteArrayOutputStream(BUFFER_SIZE);
+ return baos;
+ }
+
+ private boolean isBinary(String contentType) {
+ if (contentType != null) {
+ String ct = contentType.toLowerCase(Locale.ROOT);
+ return !(ct.startsWith("text") || ct.contains("json") || ct.contains("xml") || ct.contains("yaml"));
+ }
+ return false;
+ }
+
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java
new file mode 100644
index 0000000000..6b839518c1
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java
@@ -0,0 +1,110 @@
+package xyz.block.ftl.runtime;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.function.BiFunction;
+
+import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
+import org.jboss.resteasy.reactive.server.core.parameters.ParameterExtractor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.arc.Arc;
+import io.quarkus.runtime.annotations.Recorder;
+import xyz.block.ftl.v1.CallRequest;
+
+@Recorder
+public class FTLRecorder {
+
+ public static final String X_FTL_VERB = "X-ftl-verb";
+
+ public void registerVerb(String module, String verbName, String methodName, List> parameterTypes,
+ Class> verbHandlerClass, List> paramMappers) {
+ //TODO: this sucks
+ try {
+ var method = verbHandlerClass.getDeclaredMethod(methodName, parameterTypes.toArray(new Class[0]));
+ var handlerInstance = Arc.container().instance(verbHandlerClass);
+ Arc.container().instance(VerbRegistry.class).get().register(module, verbName, handlerInstance, method,
+ paramMappers);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void registerHttpIngress(String module, String verbName) {
+ try {
+ Arc.container().instance(VerbRegistry.class).get().register(module, verbName,
+ Arc.container().instance(FTLHttpHandler.class).get());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public BiFunction topicSupplier(String className, String callingVerb) {
+ try {
+ var cls = Thread.currentThread().getContextClassLoader().loadClass(className.replace("/", "."));
+ var topic = cls.getDeclaredConstructor(String.class).newInstance(callingVerb);
+ return new BiFunction() {
+ @Override
+ public Object apply(ObjectMapper mapper, CallRequest callRequest) {
+ return topic;
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public BiFunction verbClientSupplier(String className) {
+ try {
+ var cls = Thread.currentThread().getContextClassLoader().loadClass(className.replace("/", "."));
+ var client = cls.getDeclaredConstructor().newInstance();
+ return new BiFunction() {
+ @Override
+ public Object apply(ObjectMapper mapper, CallRequest callRequest) {
+ return client;
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ParameterExtractor topicParamExtractor(String className) {
+
+ try {
+ var cls = Thread.currentThread().getContextClassLoader().loadClass(className.replace("/", "."));
+ Constructor> ctor = cls.getDeclaredConstructor(String.class);
+ return new ParameterExtractor() {
+ @Override
+ public Object extractParameter(ResteasyReactiveRequestContext context) {
+
+ try {
+ Object topic = ctor.newInstance(context.getHeader(X_FTL_VERB, true));
+ return topic;
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ParameterExtractor verbParamExtractor(String className) {
+ try {
+ var cls = Thread.currentThread().getContextClassLoader().loadClass(className.replace("/", "."));
+ var client = cls.getDeclaredConstructor().newInstance();
+ return new ParameterExtractor() {
+ @Override
+ public Object extractParameter(ResteasyReactiveRequestContext context) {
+ return client;
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/TopicHelper.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/TopicHelper.java
new file mode 100644
index 0000000000..aa1e0fb20b
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/TopicHelper.java
@@ -0,0 +1,32 @@
+package xyz.block.ftl.runtime;
+
+import jakarta.inject.Singleton;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.arc.Arc;
+
+@Singleton
+public class TopicHelper {
+
+ final FTLController controller;
+ final ObjectMapper mapper;
+
+ public TopicHelper(FTLController controller, ObjectMapper mapper) {
+ this.controller = controller;
+ this.mapper = mapper;
+ }
+
+ public void publish(String topic, String verb, Object message) {
+ try {
+ controller.publishEvent(topic, verb, mapper.writeValueAsBytes(message));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static TopicHelper instance() {
+ return Arc.container().instance(TopicHelper.class).get();
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbClientHelper.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbClientHelper.java
new file mode 100644
index 0000000000..b28037c76c
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbClientHelper.java
@@ -0,0 +1,48 @@
+package xyz.block.ftl.runtime;
+
+import java.util.Map;
+
+import jakarta.inject.Singleton;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.arc.Arc;
+
+@Singleton
+public class VerbClientHelper {
+
+ final FTLController controller;
+ final ObjectMapper mapper;
+
+ public VerbClientHelper(FTLController controller, ObjectMapper mapper) {
+ this.controller = controller;
+ this.mapper = mapper;
+ }
+
+ public Object call(String verb, String module, Object message, Class> returnType, boolean listReturnType,
+ boolean mapReturnType) {
+ try {
+ if (message == null) {
+ //Unit must be an empty map
+ //TODO: what about optional?
+ message = Map.of();
+ }
+ var result = controller.callVerb(verb, module, mapper.writeValueAsBytes(message));
+ if (listReturnType) {
+ return mapper.readerForArrayOf(returnType).readValue(result);
+ } else if (mapReturnType) {
+ return mapper.readerForMapOf(returnType).readValue(result);
+ }
+ if (result == null) {
+ return null;
+ }
+ return mapper.readerFor(returnType).readValue(result);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static VerbClientHelper instance() {
+ return Arc.container().instance(VerbClientHelper.class).get();
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java
new file mode 100644
index 0000000000..714f197a05
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java
@@ -0,0 +1,51 @@
+package xyz.block.ftl.runtime;
+
+import jakarta.inject.Singleton;
+
+import io.grpc.stub.StreamObserver;
+import io.quarkus.grpc.GrpcService;
+import xyz.block.ftl.v1.*;
+
+@Singleton
+@GrpcService
+public class VerbHandler extends VerbServiceGrpc.VerbServiceImplBase {
+
+ final VerbRegistry registry;
+
+ public VerbHandler(VerbRegistry registry) {
+ this.registry = registry;
+ }
+
+ @Override
+ public void call(CallRequest request, StreamObserver responseObserver) {
+ var response = registry.invoke(request);
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void publishEvent(PublishEventRequest request, StreamObserver responseObserver) {
+ super.publishEvent(request, responseObserver);
+ }
+
+ @Override
+ public void sendFSMEvent(SendFSMEventRequest request, StreamObserver responseObserver) {
+ super.sendFSMEvent(request, responseObserver);
+ }
+
+ @Override
+ public StreamObserver acquireLease(StreamObserver responseObserver) {
+ return super.acquireLease(responseObserver);
+ }
+
+ @Override
+ public void getModuleContext(ModuleContextRequest request, StreamObserver responseObserver) {
+ super.getModuleContext(request, responseObserver);
+ }
+
+ @Override
+ public void ping(PingRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(PingResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbInvoker.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbInvoker.java
new file mode 100644
index 0000000000..d27c233dcd
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbInvoker.java
@@ -0,0 +1,9 @@
+package xyz.block.ftl.runtime;
+
+import xyz.block.ftl.v1.CallRequest;
+import xyz.block.ftl.v1.CallResponse;
+
+public interface VerbInvoker {
+
+ CallResponse handle(CallRequest in);
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java
new file mode 100644
index 0000000000..a0202690b4
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java
@@ -0,0 +1,179 @@
+package xyz.block.ftl.runtime;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+import jakarta.inject.Singleton;
+
+import org.jboss.logging.Logger;
+import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
+import org.jboss.resteasy.reactive.server.core.parameters.ParameterExtractor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+
+import io.quarkus.arc.Arc;
+import io.quarkus.arc.InstanceHandle;
+import xyz.block.ftl.v1.CallRequest;
+import xyz.block.ftl.v1.CallResponse;
+
+@Singleton
+public class VerbRegistry {
+
+ private static final Logger log = Logger.getLogger(VerbRegistry.class);
+
+ final ObjectMapper mapper;
+
+ private final Map verbs = new ConcurrentHashMap<>();
+
+ public VerbRegistry(ObjectMapper mapper) {
+ this.mapper = mapper;
+ }
+
+ public void register(String module, String name, InstanceHandle> verbHandlerClass, Method method,
+ List> paramMappers) {
+ verbs.put(new Key(module, name), new AnnotatedEndpointHandler(verbHandlerClass, method, paramMappers));
+ }
+
+ public void register(String module, String name, VerbInvoker verbInvoker) {
+ verbs.put(new Key(module, name), verbInvoker);
+ }
+
+ public CallResponse invoke(CallRequest request) {
+ VerbInvoker handler = verbs.get(new Key(request.getVerb().getModule(), request.getVerb().getName()));
+ if (handler == null) {
+ return CallResponse.newBuilder().setError(CallResponse.Error.newBuilder().setMessage("Verb not found").build())
+ .build();
+ }
+ return handler.handle(request);
+ }
+
+ private record Key(String module, String name) {
+
+ }
+
+ private class AnnotatedEndpointHandler implements VerbInvoker {
+ final InstanceHandle> verbHandlerClass;
+ final Method method;
+ final List> parameterSuppliers;
+
+ private AnnotatedEndpointHandler(InstanceHandle> verbHandlerClass, Method method,
+ List> parameterSuppliers) {
+ this.verbHandlerClass = verbHandlerClass;
+ this.method = method;
+ this.parameterSuppliers = parameterSuppliers;
+ }
+
+ public CallResponse handle(CallRequest in) {
+ try {
+ Object[] params = new Object[parameterSuppliers.size()];
+ for (int i = 0; i < parameterSuppliers.size(); i++) {
+ params[i] = parameterSuppliers.get(i).apply(mapper, in);
+ }
+ Object ret;
+ ret = method.invoke(verbHandlerClass.get(), params);
+ var mappedResponse = mapper.writer().writeValueAsBytes(ret);
+ return CallResponse.newBuilder().setBody(ByteString.copyFrom(mappedResponse)).build();
+ } catch (Exception e) {
+ log.errorf(e, "Failed to invoke verb %s.%s", in.getVerb().getModule(), in.getVerb().getName());
+ return CallResponse.newBuilder().setError(CallResponse.Error.newBuilder().setMessage(e.getMessage()).build())
+ .build();
+ }
+ }
+ }
+
+ public record BodySupplier(Class> inputClass) implements BiFunction {
+
+ @Override
+ public Object apply(ObjectMapper mapper, CallRequest in) {
+ try {
+ return mapper.createParser(in.getBody().newInput()).readValueAs(inputClass);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class SecretSupplier implements BiFunction, ParameterExtractor {
+
+ final String name;
+ final Class> inputClass;
+
+ volatile FTLController ftlController;
+
+ public SecretSupplier(String name, Class> inputClass) {
+ this.name = name;
+ this.inputClass = inputClass;
+ }
+
+ @Override
+ public Object apply(ObjectMapper mapper, CallRequest in) {
+ if (ftlController == null) {
+ ftlController = Arc.container().instance(FTLController.class).get();
+ }
+ var secret = ftlController.getSecret(name);
+ try {
+ return mapper.createParser(secret).readValueAs(inputClass);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Class> getInputClass() {
+ return inputClass;
+ }
+
+ @Override
+ public Object extractParameter(ResteasyReactiveRequestContext context) {
+ return apply(Arc.container().instance(ObjectMapper.class).get(), null);
+ }
+ }
+
+ public static class ConfigSupplier implements BiFunction, ParameterExtractor {
+
+ final String name;
+ final Class> inputClass;
+
+ volatile FTLController ftlController;
+
+ public ConfigSupplier(String name, Class> inputClass) {
+ this.name = name;
+ this.inputClass = inputClass;
+ }
+
+ @Override
+ public Object apply(ObjectMapper mapper, CallRequest in) {
+ if (ftlController == null) {
+ ftlController = Arc.container().instance(FTLController.class).get();
+ }
+ var secret = ftlController.getConfig(name);
+ try {
+ return mapper.createParser(secret).readValueAs(inputClass);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object extractParameter(ResteasyReactiveRequestContext context) {
+ return apply(Arc.container().instance(ObjectMapper.class).get(), null);
+ }
+
+ public Class> getInputClass() {
+ return inputClass;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpRequest.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpRequest.java
new file mode 100644
index 0000000000..59b5d97c60
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpRequest.java
@@ -0,0 +1,66 @@
+package xyz.block.ftl.runtime.builtin;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * TODO: should this be generated?
+ */
+public class HttpRequest {
+ private String method;
+ private String path;
+ private Map pathParameters;
+ private Map> query;
+ private Map> headers;
+ private JsonNode body;
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Map getPathParameters() {
+ return pathParameters;
+ }
+
+ public void setPathParameters(Map pathParameters) {
+ this.pathParameters = pathParameters;
+ }
+
+ public Map> getQuery() {
+ return query;
+ }
+
+ public void setQuery(Map> query) {
+ this.query = query;
+ }
+
+ public Map> getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map> headers) {
+ this.headers = headers;
+ }
+
+ public JsonNode getBody() {
+ return body;
+ }
+
+ public void setBody(JsonNode body) {
+ this.body = body;
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpResponse.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpResponse.java
new file mode 100644
index 0000000000..6db4d8aaf9
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/builtin/HttpResponse.java
@@ -0,0 +1,49 @@
+package xyz.block.ftl.runtime.builtin;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonRawValue;
+
+/**
+ * TODO: should this be generated
+ */
+public class HttpResponse {
+ private long status;
+ private Map> headers;
+ @JsonRawValue
+ private String body;
+ private Throwable error;
+
+ public long getStatus() {
+ return status;
+ }
+
+ public void setStatus(long status) {
+ this.status = status;
+ }
+
+ public Map> getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map> headers) {
+ this.headers = headers;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public void setError(Throwable error) {
+ this.error = error;
+ }
+}
diff --git a/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 0000000000..6f5b0bb2b5
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,9 @@
+name: Ftl Java Runtime
+#description: Do something useful.
+metadata:
+# keywords:
+# - ftl-java-runtime
+# guide: ... # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension
+# categories:
+# - "miscellaneous"
+# status: "preview"
diff --git a/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource b/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource
new file mode 100644
index 0000000000..28ef804d0b
--- /dev/null
+++ b/java-runtime/ftl-runtime/runtime/src/main/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource
@@ -0,0 +1 @@
+xyz.block.ftl.runtime.FTLConfigSource
\ No newline at end of file
diff --git a/java-runtime/ftl-runtime/test-framework/pom.xml b/java-runtime/ftl-runtime/test-framework/pom.xml
new file mode 100644
index 0000000000..c66028219f
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/pom.xml
@@ -0,0 +1,32 @@
+
+
+ 4.0.0
+
+
+ xyz.block
+ ftl-java-runtime-parent
+ 1.0.0-SNAPSHOT
+
+ ftl-java-test-framework
+ Ftl Java Runtime - Test Framework
+
+
+
+
+ xyz.block
+ ftl-java-runtime
+
+
+ io.quarkus
+ quarkus-junit5
+ compile
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+
+
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/FTLManaged.java b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/FTLManaged.java
new file mode 100644
index 0000000000..bfcf6f7b98
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/FTLManaged.java
@@ -0,0 +1,8 @@
+package xyz.block.ftl.java.test;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface FTLManaged {
+}
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/TestFTL.java b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/TestFTL.java
new file mode 100644
index 0000000000..0259f26c9a
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/TestFTL.java
@@ -0,0 +1,16 @@
+package xyz.block.ftl.java.test;
+
+public class TestFTL {
+
+ public static TestFTL FTL = new TestFTL();
+
+ public static TestFTL ftl() {
+ return FTL;
+ }
+
+ public TestFTL setSecret(String secret, byte[] value) {
+
+ return this;
+ }
+
+}
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestResource.java b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestResource.java
new file mode 100644
index 0000000000..f7fb23accf
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestResource.java
@@ -0,0 +1,27 @@
+package xyz.block.ftl.java.test.internal;
+
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+public class FTLTestResource implements QuarkusTestResourceLifecycleManager {
+
+ FTLTestServer server;
+
+ @Override
+ public Map start() {
+ server = new FTLTestServer();
+ server.start();
+ return Map.of("ftl.endpoint", "http://127.0.0.1:" + server.getPort());
+ }
+
+ @Override
+ public void stop() {
+ server.stop();
+ }
+
+ @Override
+ public void inject(TestInjector testInjector) {
+
+ }
+}
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestServer.java b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestServer.java
new file mode 100644
index 0000000000..163a3ccad6
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/FTLTestServer.java
@@ -0,0 +1,33 @@
+package xyz.block.ftl.java.test.internal;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+
+public class FTLTestServer {
+
+ Server grpcServer;
+
+ public void start() {
+
+ var addr = new InetSocketAddress("127.0.0.1", 0);
+ grpcServer = NettyServerBuilder.forAddress(addr)
+ .addService(new TestVerbServer())
+ .build();
+ try {
+ grpcServer.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getPort() {
+ return grpcServer.getPort();
+ }
+
+ public void stop() {
+ grpcServer.shutdown();
+ }
+}
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/TestVerbServer.java b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/TestVerbServer.java
new file mode 100644
index 0000000000..bd46c97b0c
--- /dev/null
+++ b/java-runtime/ftl-runtime/test-framework/src/main/java/xyz/block/ftl/java/test/internal/TestVerbServer.java
@@ -0,0 +1,109 @@
+package xyz.block.ftl.java.test.internal;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import io.quarkus.arc.Arc;
+import xyz.block.ftl.v1.AcquireLeaseRequest;
+import xyz.block.ftl.v1.AcquireLeaseResponse;
+import xyz.block.ftl.v1.CallRequest;
+import xyz.block.ftl.v1.CallResponse;
+import xyz.block.ftl.v1.ModuleContextRequest;
+import xyz.block.ftl.v1.ModuleContextResponse;
+import xyz.block.ftl.v1.PingRequest;
+import xyz.block.ftl.v1.PingResponse;
+import xyz.block.ftl.v1.PublishEventRequest;
+import xyz.block.ftl.v1.PublishEventResponse;
+import xyz.block.ftl.v1.SendFSMEventRequest;
+import xyz.block.ftl.v1.SendFSMEventResponse;
+import xyz.block.ftl.v1.VerbServiceGrpc;
+
+public class TestVerbServer extends VerbServiceGrpc.VerbServiceImplBase {
+
+ final VerbServiceGrpc.VerbServiceStub verbService;
+
+ /**
+ * TODO: this is so hacked up
+ */
+ static final Map> fakeVerbs = new HashMap<>();
+
+ public TestVerbServer() {
+ var channelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 8081);
+ channelBuilder.usePlaintext();
+ var channel = channelBuilder.build();
+ verbService = VerbServiceGrpc.newStub(channel);
+ }
+
+ @Override
+ public void call(CallRequest request, StreamObserver responseObserver) {
+ Key key = new Key(request.getVerb().getModule(), request.getVerb().getName());
+ if (fakeVerbs.containsKey(key)) {
+ //TODO: YUCK YUCK YUCK
+ //This all needs a refactor
+ ObjectMapper mapper = Arc.container().instance(ObjectMapper.class).get();
+
+ Function, ?> function = fakeVerbs.get(key);
+ Class> type = null;
+ for (var m : function.getClass().getMethods()) {
+ if (m.getName().equals("apply") && m.getParameterCount() == 1) {
+ type = m.getParameterTypes()[0];
+ if (type != Object.class) {
+ break;
+ }
+ }
+ }
+ try {
+ var result = function.apply(mapper.readerFor(type).readValue(request.getBody().newInput()));
+ responseObserver.onNext(
+ CallResponse.newBuilder().setBody(ByteString.copyFrom(mapper.writeValueAsBytes(result))).build());
+ responseObserver.onCompleted();
+ } catch (IOException e) {
+ responseObserver.onError(e);
+ }
+ return;
+ }
+ verbService.call(request, responseObserver);
+ }
+
+ @Override
+ public void publishEvent(PublishEventRequest request, StreamObserver responseObserver) {
+ super.publishEvent(request, responseObserver);
+ }
+
+ @Override
+ public void sendFSMEvent(SendFSMEventRequest request, StreamObserver responseObserver) {
+ super.sendFSMEvent(request, responseObserver);
+ }
+
+ @Override
+ public StreamObserver acquireLease(StreamObserver responseObserver) {
+ return super.acquireLease(responseObserver);
+ }
+
+ @Override
+ public void getModuleContext(ModuleContextRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(ModuleContextResponse.newBuilder().setModule("test")
+ .putConfigs("test", ByteString.copyFrom("test", StandardCharsets.UTF_8)).build());
+ }
+
+ @Override
+ public void ping(PingRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(PingResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ public static void registerFakeVerb(String module, String verb, Function
verbFunction) {
+ fakeVerbs.put(new Key(module, verb), verbFunction);
+ }
+
+ record Key(String module, String verb) {
+ }
+}
diff --git a/java-runtime/ftl-runtime/test-framework/src/main/resources/application.properties b/java-runtime/ftl-runtime/test-framework/src/main/resources/application.properties
new file mode 100644
index 0000000000..e69de29bb2