From 5699da2941ba8d0bbecc636d990f8e23dcaa5146 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza Date: Mon, 28 Oct 2024 12:04:13 +0100 Subject: [PATCH] Use stats collector for queries and streams --- .../com/fauna/client/BaseFaunaClient.java | 2 +- .../java/com/fauna/client/FaunaClient.java | 15 +++++--- .../java/com/fauna/client/FaunaConfig.java | 30 ++++++++++++++++ .../java/com/fauna/client/FaunaStream.java | 10 +++++- .../com/fauna/client/ScopedFaunaClient.java | 2 +- .../com/fauna/response/QueryResponse.java | 8 ++++- .../com/fauna/client/FaunaClientTest.java | 15 ++++---- .../com/fauna/client/FaunaConfigTest.java | 5 +++ .../java/com/fauna/e2e/E2EPaginationTest.java | 35 +++++++++++++++++++ src/test/java/com/fauna/e2e/E2EQueryTest.java | 34 ++++++++++++++++++ .../java/com/fauna/e2e/E2EStreamingTest.java | 24 +++++++++++-- .../exception/ConstraintFailureTest.java | 4 +-- .../com/fauna/exception/TestErrorHandler.java | 2 +- .../com/fauna/response/QueryResponseTest.java | 6 ++-- 14 files changed, 167 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/fauna/client/BaseFaunaClient.java b/src/main/java/com/fauna/client/BaseFaunaClient.java index 585ef89b..d2204c84 100644 --- a/src/main/java/com/fauna/client/BaseFaunaClient.java +++ b/src/main/java/com/fauna/client/BaseFaunaClient.java @@ -25,7 +25,7 @@ public final class BaseFaunaClient extends FaunaClient { */ public BaseFaunaClient(FaunaConfig faunaConfig, HttpClient httpClient, RetryStrategy retryStrategy) { - super(faunaConfig.getSecret(), faunaConfig.getLogHandler()); + super(faunaConfig.getSecret(), faunaConfig.getLogHandler(), faunaConfig.getStatsCollector()); this.httpClient = httpClient; if (Objects.isNull(faunaConfig)) { throw new IllegalArgumentException("FaunaConfig cannot be null."); diff --git a/src/main/java/com/fauna/client/FaunaClient.java b/src/main/java/com/fauna/client/FaunaClient.java index 081f4056..0e00703e 100644 --- a/src/main/java/com/fauna/client/FaunaClient.java +++ b/src/main/java/com/fauna/client/FaunaClient.java @@ -45,22 +45,25 @@ public abstract class FaunaClient { private final CodecProvider codecProvider = new DefaultCodecProvider(new DefaultCodecRegistry()); private final AtomicLong lastTransactionTs = new AtomicLong(-1); private final Logger logger; + private final StatsCollector statsCollector; abstract RetryStrategy getRetryStrategy(); abstract HttpClient getHttpClient(); abstract RequestBuilder getRequestBuilder(); abstract RequestBuilder getStreamRequestBuilder(); - public FaunaClient(String secret, Logger logger) { + public FaunaClient(String secret, Logger logger, StatsCollector statsCollector) { this.faunaSecret = secret; this.logger = logger; + this.statsCollector = statsCollector; } - public FaunaClient(String secret, Handler logHandler) { + public FaunaClient(String secret, Handler logHandler, StatsCollector statsCollector) { this.faunaSecret = secret; this.logger = Logger.getLogger(this.getClass().getCanonicalName()); this.logger.addHandler(logHandler); this.logger.setLevel(logHandler.getLevel()); + this.statsCollector = statsCollector; } protected String getFaunaSecret() { @@ -71,6 +74,10 @@ public Logger getLogger() { return this.logger; } + public Optional getStatsCollector() { + return Optional.ofNullable(this.statsCollector); + } + public Optional getLastTransactionTs() { long ts = lastTransactionTs.get(); return ts > 0 ? Optional.of(ts) : Optional.empty(); @@ -111,7 +118,7 @@ private Supplier>> makeAsyncRequest(HttpCl return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply( response -> { logResponse(response); - return QueryResponse.parseResponse(response, codec); + return QueryResponse.parseResponse(response, codec, statsCollector); }).whenComplete(this::completeRequest); } @@ -406,7 +413,7 @@ public CompletableFuture> asyncStream(StreamRequest streamReq return getHttpClient().sendAsync(streamReq, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> { CompletableFuture> publisher = new CompletableFuture<>(); - FaunaStream fstream = new FaunaStream<>(elementClass); + FaunaStream fstream = new FaunaStream<>(elementClass, this.statsCollector); response.body().subscribe(fstream); publisher.complete(fstream); return publisher; diff --git a/src/main/java/com/fauna/client/FaunaConfig.java b/src/main/java/com/fauna/client/FaunaConfig.java index 7f725161..51eb8339 100644 --- a/src/main/java/com/fauna/client/FaunaConfig.java +++ b/src/main/java/com/fauna/client/FaunaConfig.java @@ -22,6 +22,7 @@ public static class FaunaEndpoint { private final int maxContentionRetries; private final Duration clientTimeoutBuffer; private final Handler logHandler; + private final StatsCollector statsCollector; public static final FaunaConfig DEFAULT = FaunaConfig.builder().build(); public static final FaunaConfig LOCAL = FaunaConfig.builder().endpoint( FaunaEndpoint.LOCAL).secret("secret").build(); @@ -37,6 +38,7 @@ private FaunaConfig(Builder builder) { this.maxContentionRetries = builder.maxContentionRetries; this.clientTimeoutBuffer = builder.clientTimeoutBuffer; this.logHandler = builder.logHandler; + this.statsCollector = builder.statsCollector; } /** @@ -82,6 +84,14 @@ public Handler getLogHandler() { return logHandler; } + /** + * Gets the stats collector for the client. + * @return A log handler instance. + */ + public StatsCollector getStatsCollector() { + return statsCollector; + } + /** * Creates a new builder for FaunaConfig. * @@ -100,6 +110,7 @@ public static class Builder { private int maxContentionRetries = 3; private Duration clientTimeoutBuffer = Duration.ofSeconds(5); private Handler logHandler = defaultLogHandler(); + private StatsCollector statsCollector; static Level getLogLevel(String debug) { if (debug == null || debug.isBlank()) { @@ -170,6 +181,25 @@ public Builder logHandler(Handler handler) { return this; } + /** + * Set a StatsCollector. + * @param statsCollector A stats collector instance. + * @return The current Builder instance. + */ + public Builder statsCollector(StatsCollector statsCollector) { + this.statsCollector = statsCollector; + return this; + } + + /** + * Set a default StatsCollector. + * @return The current Builder instance. + */ + public Builder defaultStatsCollector() { + this.statsCollector = new StatsCollectorImpl(); + return this; + } + /** * Builds and returns a new FaunaConfig instance. * diff --git a/src/main/java/com/fauna/client/FaunaStream.java b/src/main/java/com/fauna/client/FaunaStream.java index d6ba39c6..fd1f4c7b 100644 --- a/src/main/java/com/fauna/client/FaunaStream.java +++ b/src/main/java/com/fauna/client/FaunaStream.java @@ -13,6 +13,7 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.List; +import java.util.Optional; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; @@ -25,8 +26,10 @@ public class FaunaStream extends SubmissionPublisher> implemen private Subscription subscription; private Subscriber> eventSubscriber; private MultiByteBufferInputStream buffer = null; + private final StatsCollector statsCollector; - public FaunaStream(Class elementClass) { + public FaunaStream(Class elementClass, StatsCollector statsCollector) { + this.statsCollector = statsCollector; this.dataCodec = DefaultCodecProvider.SINGLETON.get(elementClass); } @@ -60,6 +63,11 @@ public void onNext(List buffers) { try { JsonParser parser = JSON_FACTORY.createParser(buffer); StreamEvent event = StreamEvent.parse(parser, dataCodec); + + if (statsCollector != null) { + statsCollector.add(event.getStats()); + } + if (event.getType() == StreamEvent.EventType.ERROR) { ErrorInfo error = event.getError(); this.onComplete(); diff --git a/src/main/java/com/fauna/client/ScopedFaunaClient.java b/src/main/java/com/fauna/client/ScopedFaunaClient.java index ae039d2b..27049c88 100644 --- a/src/main/java/com/fauna/client/ScopedFaunaClient.java +++ b/src/main/java/com/fauna/client/ScopedFaunaClient.java @@ -9,7 +9,7 @@ public class ScopedFaunaClient extends FaunaClient { public ScopedFaunaClient(FaunaClient client, FaunaScope scope) { - super(client.getFaunaSecret(), client.getLogger()); + super(client.getFaunaSecret(), client.getLogger(), client.getStatsCollector().orElse(null)); this.client = client; this.requestBuilder = client.getRequestBuilder().scopedRequestBuilder(scope.getToken(client.getFaunaSecret())); this.streamRequestBuilder = client.getStreamRequestBuilder().scopedRequestBuilder(scope.getToken(client.getFaunaSecret())); diff --git a/src/main/java/com/fauna/response/QueryResponse.java b/src/main/java/com/fauna/response/QueryResponse.java index eec68a9c..9758409e 100644 --- a/src/main/java/com/fauna/response/QueryResponse.java +++ b/src/main/java/com/fauna/response/QueryResponse.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import com.fauna.client.StatsCollector; import com.fauna.codec.Codec; import com.fauna.codec.UTF8FaunaParser; import com.fauna.exception.ClientResponseException; @@ -135,7 +136,7 @@ private static Builder handleField(Builder builder, JsonParser parser) } } - public static QuerySuccess parseResponse(HttpResponse response, Codec codec) throws FaunaException { + public static QuerySuccess parseResponse(HttpResponse response, Codec codec, StatsCollector statsCollector) throws FaunaException { try { JsonParser parser = JSON_FACTORY.createParser(response.body()); @@ -147,6 +148,11 @@ public static QuerySuccess parseResponse(HttpResponse respo while (parser.nextToken() == JsonToken.FIELD_NAME) { builder = handleField(builder, parser); } + + if (statsCollector != null && builder.stats != null) { + statsCollector.add(builder.stats); + } + int httpStatus = response.statusCode(); if (httpStatus >= 400) { QueryFailure failure = new QueryFailure(httpStatus, builder); diff --git a/src/test/java/com/fauna/client/FaunaClientTest.java b/src/test/java/com/fauna/client/FaunaClientTest.java index d125a0b9..3343d891 100644 --- a/src/test/java/com/fauna/client/FaunaClientTest.java +++ b/src/test/java/com/fauna/client/FaunaClientTest.java @@ -81,18 +81,12 @@ static HttpResponse mockResponse(String body) { void defaultClient() { FaunaClient client = Fauna.client(); assertTrue(client.getHttpClient().connectTimeout().isEmpty()); + assertTrue(client.getStatsCollector().isEmpty()); assertEquals(URI.create("https://db.fauna.com/query/1"), client.getRequestBuilder().buildRequest( fql("hello"), QueryOptions.builder().build(), DefaultCodecProvider.SINGLETON, 1L).uri()); } - @Test - void defaultConfigBuilder() { - FaunaConfig config = FaunaConfig.builder().build(); - assertEquals("https://db.fauna.com", config.getEndpoint()); - assertEquals("", config.getSecret()); - } - @Test void customConfigBuilder() { FaunaConfig config = FaunaConfig.builder() @@ -108,8 +102,13 @@ void customConfigBuilder() { @Test void customConfigConstructor() { - FaunaClient client = Fauna.client(FaunaConfig.builder().secret("foo").build()); + FaunaConfig cfg = FaunaConfig.builder() + .secret("foo") + .defaultStatsCollector() + .build(); + FaunaClient client = Fauna.client(cfg); assertTrue(client.toString().startsWith("com.fauna.client.BaseFaunaClient")); + assertTrue(client.getStatsCollector().isPresent()); } @Test diff --git a/src/test/java/com/fauna/client/FaunaConfigTest.java b/src/test/java/com/fauna/client/FaunaConfigTest.java index 170c7a88..a9e20e48 100644 --- a/src/test/java/com/fauna/client/FaunaConfigTest.java +++ b/src/test/java/com/fauna/client/FaunaConfigTest.java @@ -8,6 +8,8 @@ import java.util.logging.ConsoleHandler; import java.util.logging.Level; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertEquals; public class FaunaConfigTest { @@ -21,6 +23,7 @@ public void testDefaultFaunaConfig() { assertEquals(Level.WARNING, config.getLogHandler().getLevel()); assertEquals("", config.getSecret()); assertEquals(3, config.getMaxContentionRetries()); + assertNull(config.getStatsCollector()); } @Test @@ -35,11 +38,13 @@ public void testOverridingDefaultFaunaConfig() { .logHandler(handler) .maxContentionRetries(1) .clientTimeoutBuffer(Duration.ofSeconds(1)) + .defaultStatsCollector() .build(); assertEquals("endpoint", config.getEndpoint()); assertEquals(Level.ALL, config.getLogHandler().getLevel()); assertEquals("foo", config.getSecret()); assertEquals(1, config.getMaxContentionRetries()); + assertNotNull(config.getStatsCollector()); } @ParameterizedTest diff --git a/src/test/java/com/fauna/e2e/E2EPaginationTest.java b/src/test/java/com/fauna/e2e/E2EPaginationTest.java index 676cc6c8..c545b130 100644 --- a/src/test/java/com/fauna/e2e/E2EPaginationTest.java +++ b/src/test/java/com/fauna/e2e/E2EPaginationTest.java @@ -2,6 +2,7 @@ import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaConfig; import com.fauna.client.PageIterator; import com.fauna.e2e.beans.Product; import com.fauna.response.QuerySuccess; @@ -111,4 +112,38 @@ public void query_all_flattened() { } assertEquals(50, products.size()); } + + @Test + public void query_statsAreTrackedForExplicitPagination() { + var cfg = FaunaConfig.builder() + .secret("secret") + .endpoint("http://localhost:8443") + .defaultStatsCollector() + .build(); + var client = Fauna.client(cfg); + PageIterator iter = client.paginate(fql("Product.all()"), Product.class); + iter.forEachRemaining(page -> {}); + + var stats = client.getStatsCollector().get().read(); + assertEquals(82, stats.getReadOps()); + assertEquals(4, stats.getComputeOps()); + } + + @Test + public void query_statsAreTrackedForFlattenedPagination() { + var cfg = FaunaConfig.builder() + .secret("secret") + .endpoint("http://localhost:8443") + .defaultStatsCollector() + .build(); + var client = Fauna.client(cfg); + PageIterator iter = client.paginate(fql("Product.all()"), Product.class); + Iterator productIter = iter.flatten(); + for (Product p : (Iterable) () -> productIter) { + } + + var stats = client.getStatsCollector().get().read(); + assertEquals(82, stats.getReadOps()); + assertEquals(4, stats.getComputeOps()); + } } diff --git a/src/test/java/com/fauna/e2e/E2EQueryTest.java b/src/test/java/com/fauna/e2e/E2EQueryTest.java index 69cd990e..ad43d026 100644 --- a/src/test/java/com/fauna/e2e/E2EQueryTest.java +++ b/src/test/java/com/fauna/e2e/E2EQueryTest.java @@ -2,6 +2,7 @@ import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaConfig; import com.fauna.e2e.beans.Author; import com.fauna.exception.AbortException; import com.fauna.exception.QueryRuntimeException; @@ -277,4 +278,37 @@ public void query_withTags() { assertEquals("1", success.getQueryTags().get("first")); assertEquals("2", success.getQueryTags().get("second")); } + + @Test + public void query_trackStatsOnSuccess() { + var cfg = FaunaConfig.builder() + .secret("secret") + .endpoint("http://localhost:8443") + .defaultStatsCollector() + .build(); + var client = Fauna.client(cfg); + + var q = fql("Author.all().toArray()"); + + client.query(q, listOf(Author.class)); + var stats = client.getStatsCollector().get().read(); + assertEquals(10, stats.getReadOps()); + assertEquals(1, stats.getComputeOps()); + } + + @Test + public void query_trackStatsOnFailure() throws IOException { + var cfg = FaunaConfig.builder() + .secret("secret") + .endpoint("http://localhost:8443") + .defaultStatsCollector() + .build(); + var client = Fauna.client(cfg); + + var q = fql("Author.all().toArray()\nabort(null)"); + assertThrows(AbortException.class, () -> client.query(q)); + var stats = client.getStatsCollector().get().read(); + assertEquals(8, stats.getReadOps()); + assertEquals(1, stats.getComputeOps()); + } } diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 2ffd4ac3..37984215 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -2,6 +2,7 @@ import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; +import com.fauna.client.FaunaConfig; import com.fauna.client.FaunaStream; import com.fauna.e2e.beans.Product; import com.fauna.exception.ClientException; @@ -113,17 +114,29 @@ public String status() { @Test public void query_streamOfProduct() throws InterruptedException { - FaunaStream stream = client.stream(fql("Product.all().toStream()"), Product.class); + var cfg = FaunaConfig.builder() + .secret("secret") + .endpoint("http://localhost:8443") + .defaultStatsCollector() + .build(); + var streamClient = Fauna.client(cfg); + + FaunaStream stream = streamClient.stream(fql("Product.all().toStream()"), Product.class); + var stats = streamClient.getStatsCollector().get().readAndReset(); + assertEquals(1, stats.getComputeOps()); + InventorySubscriber inventory = new InventorySubscriber(); stream.subscribe(inventory); assertFalse(stream.isClosed()); + List products = new ArrayList<>(); products.add(client.query(fql("Product.create({name: 'cheese', quantity: 1})"), Product.class).getData()); products.add(client.query(fql("Product.create({name: 'bread', quantity: 2})"), Product.class).getData()); products.add(client.query(fql("Product.create({name: 'wine', quantity: 3})"), Product.class).getData()); + long start = System.currentTimeMillis(); int events = inventory.countEvents(); - System.out.println("Events: " + events); + while (System.currentTimeMillis() < start + 2_000) { Thread.sleep(10); int latest = inventory.countEvents(); @@ -132,12 +145,17 @@ public void query_streamOfProduct() throws InterruptedException { } } inventory.onComplete(); - System.out.println(inventory.status()); Integer total = products.stream().map(Product::getQuantity).reduce(0, Integer::sum); + assertEquals(total, inventory.countInventory()); + assertFalse(stream.isClosed()); stream.close(); assertTrue(stream.isClosed()); + + stats = streamClient.getStatsCollector().get().read(); + assertEquals(11, stats.getReadOps()); + assertEquals(events, stats.getComputeOps()); } @Test diff --git a/src/test/java/com/fauna/exception/ConstraintFailureTest.java b/src/test/java/com/fauna/exception/ConstraintFailureTest.java index c61e1d6f..779bdf1a 100644 --- a/src/test/java/com/fauna/exception/ConstraintFailureTest.java +++ b/src/test/java/com/fauna/exception/ConstraintFailureTest.java @@ -97,7 +97,7 @@ public void TestConstraintFailureFromBodyWithPath() throws JsonProcessingExcepti HttpResponse resp = mock(HttpResponse.class); when(resp.body()).thenReturn(new ByteArrayInputStream(body.getBytes())); when(resp.statusCode()).thenReturn(400); - ConstraintFailureException exc = assertThrows(ConstraintFailureException.class,() -> QueryResponse.parseResponse(resp, null)); + ConstraintFailureException exc = assertThrows(ConstraintFailureException.class,() -> QueryResponse.parseResponse(resp, null, null)); assertEquals(Optional.of(List.of("name")), exc.getConstraintFailures()[0].getPathStrings()); } @@ -109,7 +109,7 @@ public void TestConstraintFailureFromBodyWithIntegerInPath() throws JsonProcessi HttpResponse resp = mock(HttpResponse.class); when(resp.body()).thenReturn(new ByteArrayInputStream(body.getBytes())); when(resp.statusCode()).thenReturn(400); - ConstraintFailureException exc = assertThrows(ConstraintFailureException.class,() -> QueryResponse.parseResponse(resp, null)); + ConstraintFailureException exc = assertThrows(ConstraintFailureException.class,() -> QueryResponse.parseResponse(resp, null, null)); assertEquals(Optional.of(List.of("name", "name2.1.2.name3")), exc.getConstraintFailures()[0].getPathStrings()); } diff --git a/src/test/java/com/fauna/exception/TestErrorHandler.java b/src/test/java/com/fauna/exception/TestErrorHandler.java index 1441135a..8b53f89f 100644 --- a/src/test/java/com/fauna/exception/TestErrorHandler.java +++ b/src/test/java/com/fauna/exception/TestErrorHandler.java @@ -65,7 +65,7 @@ public void testHandleBadRequest(TestArgs args) throws JsonProcessingException { HttpResponse resp = mock(HttpResponse.class); when(resp.body()).thenReturn(new ByteArrayInputStream(root.toString().getBytes())); when(resp.statusCode()).thenReturn(args.httpStatus); - assertThrows(args.exception, () -> parseResponse(resp, null)); + assertThrows(args.exception, () -> parseResponse(resp, null, null)); } } diff --git a/src/test/java/com/fauna/response/QueryResponseTest.java b/src/test/java/com/fauna/response/QueryResponseTest.java index afbaf835..85c584fb 100644 --- a/src/test/java/com/fauna/response/QueryResponseTest.java +++ b/src/test/java/com/fauna/response/QueryResponseTest.java @@ -55,7 +55,7 @@ public void getFromResponseBody_Success() throws IOException { HttpResponse resp = mockResponse(body); when(resp.statusCode()).thenReturn(200); - QuerySuccess success = QueryResponse.parseResponse(resp, codec); + QuerySuccess success = QueryResponse.parseResponse(resp, codec, null); assertEquals(baz.getFirstName(), success.getData().getFirstName()); assertEquals("PersonWithAttributes", success.getStaticType().get()); @@ -68,14 +68,14 @@ public void handleResponseWithInvalidJsonThrowsClientResponseException() { HttpResponse resp = mockResponse("{\"not valid json\""); when(resp.statusCode()).thenReturn(400); - ClientResponseException exc = assertThrows(ClientResponseException.class, () -> QueryResponse.parseResponse(resp, codecProvider.get(Object.class))); + ClientResponseException exc = assertThrows(ClientResponseException.class, () -> QueryResponse.parseResponse(resp, codecProvider.get(Object.class), null)); assertEquals("ClientResponseException HTTP 400: Failed to handle error response.", exc.getMessage()); } @Test public void handleResponseWithEmptyFieldsDoesNotThrow() { HttpResponse resp = mockResponse("{}"); - QuerySuccess response = QueryResponse.parseResponse(resp, codecProvider.get(Object.class)); + QuerySuccess response = QueryResponse.parseResponse(resp, codecProvider.get(Object.class), null); assertEquals(QuerySuccess.class, response.getClass()); assertNull(response.getSchemaVersion()); assertNull(response.getSummary());