From 2ce1deb9c35076bdd7fe1678e210b3c6b03d2716 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 15 Aug 2024 06:48:44 +1000 Subject: [PATCH] fix: fix Java leases and add test --- .../leases/lease_integration_test.go | 5 +- .../leases/testdata/java/leases/ftl.toml | 2 + .../leases/testdata/java/leases/pom.xml | 141 ++++++++++++++++++ .../ftl/java/test/leases/TestLeases.java | 22 +++ integration/actions.go | 13 ++ integration/harness.go | 5 + .../main/java/xyz/block/ftl/LeaseClient.java | 10 +- .../main/java/xyz/block/ftl/LeaseHandle.java | 7 + .../xyz/block/ftl/runtime/FTLController.java | 73 ++++----- .../xyz/block/ftl/runtime/VerbHandler.java | 10 +- .../xyz/block/ftl/runtime/VerbRegistry.java | 13 +- 11 files changed, 252 insertions(+), 49 deletions(-) create mode 100644 backend/controller/leases/testdata/java/leases/ftl.toml create mode 100644 backend/controller/leases/testdata/java/leases/pom.xml create mode 100644 backend/controller/leases/testdata/java/leases/src/main/java/xyz/block/ftl/java/test/leases/TestLeases.java create mode 100644 java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseHandle.java diff --git a/backend/controller/leases/lease_integration_test.go b/backend/controller/leases/lease_integration_test.go index 3af001258c..0f68a9558f 100644 --- a/backend/controller/leases/lease_integration_test.go +++ b/backend/controller/leases/lease_integration_test.go @@ -19,10 +19,12 @@ import ( func TestLease(t *testing.T) { in.Run(t, + in.WithLanguages("go", "java"), + in.WithJava(), in.CopyModule("leases"), in.Build("leases"), // checks if leases work in a unit test environment - in.ExecModuleTest("leases"), + in.IfLanguage("go", in.ExecModuleTest("leases")), in.Deploy("leases"), // checks if it leases work with a real controller func(t testing.TB, ic in.TestContext) { @@ -34,6 +36,7 @@ func TestLease(t *testing.T) { Verb: &schemapb.Ref{Module: "leases", Name: "acquire"}, Body: []byte("{}"), })) + assert.NoError(t, err) if respErr := resp.Msg.GetError(); respErr != nil { return fmt.Errorf("received error on first call: %v", respErr) } diff --git a/backend/controller/leases/testdata/java/leases/ftl.toml b/backend/controller/leases/testdata/java/leases/ftl.toml new file mode 100644 index 0000000000..970a945305 --- /dev/null +++ b/backend/controller/leases/testdata/java/leases/ftl.toml @@ -0,0 +1,2 @@ +module = "leases" +language = "java" diff --git a/backend/controller/leases/testdata/java/leases/pom.xml b/backend/controller/leases/testdata/java/leases/pom.xml new file mode 100644 index 0000000000..ff9711ab38 --- /dev/null +++ b/backend/controller/leases/testdata/java/leases/pom.xml @@ -0,0 +1,141 @@ + + + 4.0.0 + xyz.block.ftl.examples + leases + 1.0.0-SNAPSHOT + + + 1.0-SNAPSHOT + 3.13.0 + 2.0.0 + 17 + UTF-8 + UTF-8 + quarkus-bom + io.quarkus.platform + 3.12.3 + true + 3.2.5 + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + + + + + xyz.block + ftl-java-runtime + 1.0.0-SNAPSHOT + + + io.quarkus + quarkus-kotlin + + + io.quarkus + quarkus-jackson + + + io.quarkus + quarkus-rest-jackson + + + io.quarkus + quarkus-junit5 + test + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + io.rest-assured + kotlin-extensions + test + + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + native-image-agent + + + + + + maven-compiler-plugin + ${compiler-plugin.version} + + + -parameters + + + + + maven-surefire-plugin + ${surefire-plugin.version} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + maven-failsafe-plugin + ${surefire-plugin.version} + + + + integration-test + verify + + + + + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + + + + native + + + native + + + + false + true + + + + diff --git a/backend/controller/leases/testdata/java/leases/src/main/java/xyz/block/ftl/java/test/leases/TestLeases.java b/backend/controller/leases/testdata/java/leases/src/main/java/xyz/block/ftl/java/test/leases/TestLeases.java new file mode 100644 index 0000000000..e9fd132a7c --- /dev/null +++ b/backend/controller/leases/testdata/java/leases/src/main/java/xyz/block/ftl/java/test/leases/TestLeases.java @@ -0,0 +1,22 @@ +package xyz.block.ftl.java.test.leases; + +import io.quarkus.logging.Log; +import xyz.block.ftl.Export; +import xyz.block.ftl.LeaseClient; +import xyz.block.ftl.Verb; + +import java.time.Duration; + +public class TestLeases { + + @Export + @Verb + public void acquire(LeaseClient leaseClient) throws Exception { + Log.info("Acquiring lease"); + try (var lease = leaseClient.acquireLease(Duration.ofSeconds(10), "lease")) { + Log.info("Acquired lease"); + Thread.sleep(5000); + } + } + +} diff --git a/integration/actions.go b/integration/actions.go index 7f460bf9cf..a3b4e76884 100644 --- a/integration/actions.go +++ b/integration/actions.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "path/filepath" + "slices" "strings" "testing" "time" @@ -514,6 +515,18 @@ func HttpCall(method string, path string, headers map[string][]string, body []by } } +func IfLanguage(language string, action Action) Action { + return IfLanguages(action, language) +} + +func IfLanguages(action Action, languages ...string) Action { + return func(t testing.TB, ic TestContext) { + if slices.Contains(languages, ic.language) { + action(t, ic) + } + } +} + // Run "go test" in the given module. func ExecModuleTest(module string) Action { return Chdir(module, Exec("go", "test", "./...")) diff --git a/integration/harness.go b/integration/harness.go index 74273f4635..79fb915aa6 100644 --- a/integration/harness.go +++ b/integration/harness.go @@ -183,6 +183,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { }) for _, language := range opts.languages { + ctx, done := context.WithCancel(ctx) t.Run(language, func(t *testing.T) { verbs := rpc.Dial(ftlv1connect.NewVerbServiceClient, "http://localhost:8892", log.Debug) @@ -204,6 +205,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { binDir: binDir, Verbs: verbs, realT: t, + language: language, } if opts.startController { @@ -223,6 +225,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { ic.AssertWithRetry(t, action) } }) + done() } } @@ -236,6 +239,8 @@ type TestContext struct { testData string // Path to the "bin" directory. binDir string + // The Language under test + language string Controller ftlv1connect.ControllerServiceClient Console pbconsoleconnect.ConsoleServiceClient diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java index 4f5cc51242..759e7785f0 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseClient.java @@ -7,5 +7,13 @@ */ public interface LeaseClient { - void acquireLease(Duration duration, String... keys) throws LeaseFailedException; + /** + * Acquire a lease for the given keys. The lease will be held for the given duration. + * + * @param duration The time to acquire the lease for + * @param keys The lease keys + * @return A handle that can be used to release the lease + * @throws LeaseFailedException + */ + LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException; } diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseHandle.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseHandle.java new file mode 100644 index 0000000000..6d1eff7a7f --- /dev/null +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/LeaseHandle.java @@ -0,0 +1,7 @@ +package xyz.block.ftl; + +public interface LeaseHandle extends AutoCloseable { + + public void close(); + +} diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java index bee619b956..dbf1d3d7ff 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/FTLController.java @@ -3,10 +3,8 @@ import java.net.URI; import java.time.Duration; import java.util.Arrays; -import java.util.Deque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingDeque; import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; @@ -21,6 +19,7 @@ import io.quarkus.runtime.Startup; import xyz.block.ftl.LeaseClient; import xyz.block.ftl.LeaseFailedException; +import xyz.block.ftl.LeaseHandle; import xyz.block.ftl.v1.AcquireLeaseRequest; import xyz.block.ftl.v1.AcquireLeaseResponse; import xyz.block.ftl.v1.CallRequest; @@ -37,8 +36,6 @@ public class FTLController implements LeaseClient { private static final Logger log = Logger.getLogger(FTLController.class); final String moduleName; - private StreamObserver leaseClient; - private final Deque> leaseWaiters = new LinkedBlockingDeque<>(); private Throwable currentError; private volatile ModuleContextResponse moduleContextResponse; @@ -96,33 +93,6 @@ public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http var channel = channelBuilder.build(); verbService = VerbServiceGrpc.newStub(channel); verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver); - synchronized (this) { - this.leaseClient = verbService.acquireLease(new StreamObserver() { - @Override - public void onNext(AcquireLeaseResponse value) { - leaseWaiters.pop().complete(null); - } - - @Override - public void onError(Throwable t) { - synchronized (FTLController.this) { - while (!leaseWaiters.isEmpty()) { - leaseWaiters.pop().completeExceptionally(t); - } - if (!closed) { - leaseClient = verbService.acquireLease(this); - } - } - } - - @Override - public void onCompleted() { - //if we have any waiters error them out - //if we have not shut down we can try and connect again - onError(new RuntimeException("stream closed")); - } - }); - } } public byte[] getSecret(String secretName) { @@ -201,21 +171,42 @@ public void onCompleted() { } } - public void acquireLease(Duration duration, String... keys) throws LeaseFailedException { + public LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException { CompletableFuture cf = new CompletableFuture<>(); - synchronized (this) { - leaseWaiters.push(cf); - leaseClient.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName) - .addAllKey(Arrays.asList(keys)) - .setTtl(com.google.protobuf.Duration.newBuilder() - .setSeconds(duration.toSeconds())) - .build()); - } + var client = verbService.acquireLease(new StreamObserver() { + @Override + public void onNext(AcquireLeaseResponse value) { + cf.complete(null); + } + + @Override + public void onError(Throwable t) { + cf.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!cf.isDone()) { + onError(new RuntimeException("stream closed")); + } + } + }); + client.onNext(AcquireLeaseRequest.newBuilder().setModule(moduleName) + .addAllKey(Arrays.asList(keys)) + .setTtl(com.google.protobuf.Duration.newBuilder() + .setSeconds(duration.toSeconds())) + .build()); try { cf.get(); } catch (Exception e) { - throw new LeaseFailedException(e); + throw new LeaseFailedException("lease already held", e); } + return new LeaseHandle() { + @Override + public void close() { + client.onCompleted(); + } + }; } private ModuleContextResponse getModuleContext() { diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java index 714f197a05..8d5103c2ad 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbHandler.java @@ -18,9 +18,13 @@ public VerbHandler(VerbRegistry registry) { @Override public void call(CallRequest request, StreamObserver responseObserver) { - var response = registry.invoke(request); - responseObserver.onNext(response); - responseObserver.onCompleted(); + try { + var response = registry.invoke(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } } @Override diff --git a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java index 42166c0719..2208fe51ea 100644 --- a/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java +++ b/java-runtime/ftl-runtime/runtime/src/main/java/xyz/block/ftl/runtime/VerbRegistry.java @@ -1,6 +1,7 @@ package xyz.block.ftl.runtime; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.List; @@ -91,9 +92,15 @@ public CallResponse handle(CallRequest in) { var mappedResponse = mapper.writer().writeValueAsBytes(ret); return CallResponse.newBuilder().setBody(ByteString.copyFrom(mappedResponse)).build(); } - } catch (Exception e) { - log.errorf(e, "Failed to invoke verb %s.%s", in.getVerb().getModule(), in.getVerb().getName()); - return CallResponse.newBuilder().setError(CallResponse.Error.newBuilder().setMessage(e.getMessage()).build()) + } catch (Throwable e) { + if (e.getClass() == InvocationTargetException.class) { + e = e.getCause(); + } + var message = String.format("Failed to invoke verb %s.%s", in.getVerb().getModule(), in.getVerb().getName()); + log.error(message, e); + return CallResponse.newBuilder() + .setError(CallResponse.Error.newBuilder().setStack(e.toString()) + .setMessage(message + " " + e.getMessage()).build()) .build(); } }