Skip to content

Commit

Permalink
Use stats collector for queries and streams
Browse files Browse the repository at this point in the history
  • Loading branch information
pnwpedro committed Oct 28, 2024
1 parent e931c32 commit c861aad
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/fauna/client/BaseFaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public final class BaseFaunaClient extends FaunaClient {
*/
public BaseFaunaClient(FaunaConfig faunaConfig,
HttpClient httpClient, RetryStrategy retryStrategy) {
super(faunaConfig.getSecret(), faunaConfig.getLogHandler());
super(faunaConfig.getSecret(), faunaConfig.getLogHandler(), faunaConfig.getStatsCollector());
this.httpClient = httpClient;
if (Objects.isNull(faunaConfig)) {
throw new IllegalArgumentException("FaunaConfig cannot be null.");
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,25 @@ public abstract class FaunaClient {
private final CodecProvider codecProvider = new DefaultCodecProvider(new DefaultCodecRegistry());
private final AtomicLong lastTransactionTs = new AtomicLong(-1);
private final Logger logger;
private final StatsCollector statsCollector;

abstract RetryStrategy getRetryStrategy();
abstract HttpClient getHttpClient();
abstract RequestBuilder getRequestBuilder();
abstract RequestBuilder getStreamRequestBuilder();

public FaunaClient(String secret, Logger logger) {
public FaunaClient(String secret, Logger logger, StatsCollector statsCollector) {
this.faunaSecret = secret;
this.logger = logger;
this.statsCollector = statsCollector;
}

public FaunaClient(String secret, Handler logHandler) {
public FaunaClient(String secret, Handler logHandler, StatsCollector statsCollector) {
this.faunaSecret = secret;
this.logger = Logger.getLogger(this.getClass().getCanonicalName());
this.logger.addHandler(logHandler);
this.logger.setLevel(logHandler.getLevel());
this.statsCollector = statsCollector;
}

protected String getFaunaSecret() {
Expand All @@ -71,6 +74,10 @@ public Logger getLogger() {
return this.logger;
}

public Optional<StatsCollector> getStatsCollector() {
return Optional.of(this.statsCollector);
}

public Optional<Long> getLastTransactionTs() {
long ts = lastTransactionTs.get();
return ts > 0 ? Optional.of(ts) : Optional.empty();
Expand Down Expand Up @@ -111,7 +118,7 @@ private <T> Supplier<CompletableFuture<QuerySuccess<T>>> makeAsyncRequest(HttpCl
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(
response -> {
logResponse(response);
return QueryResponse.parseResponse(response, codec);
return QueryResponse.parseResponse(response, codec, statsCollector);
}).whenComplete(this::completeRequest);
}

Expand Down Expand Up @@ -406,7 +413,7 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamReq
return getHttpClient().sendAsync(streamReq,
HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
CompletableFuture<FaunaStream<E>> publisher = new CompletableFuture<>();
FaunaStream<E> fstream = new FaunaStream<>(elementClass);
FaunaStream<E> fstream = new FaunaStream<>(elementClass, this.statsCollector);
response.body().subscribe(fstream);
publisher.complete(fstream);
return publisher;
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/fauna/client/FaunaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static class FaunaEndpoint {
private final int maxContentionRetries;
private final Duration clientTimeoutBuffer;
private final Handler logHandler;
private final StatsCollector statsCollector;
public static final FaunaConfig DEFAULT = FaunaConfig.builder().build();
public static final FaunaConfig LOCAL = FaunaConfig.builder().endpoint(
FaunaEndpoint.LOCAL).secret("secret").build();
Expand All @@ -37,6 +38,7 @@ private FaunaConfig(Builder builder) {
this.maxContentionRetries = builder.maxContentionRetries;
this.clientTimeoutBuffer = builder.clientTimeoutBuffer;
this.logHandler = builder.logHandler;
this.statsCollector = builder.statsCollector;
}

/**
Expand Down Expand Up @@ -82,6 +84,14 @@ public Handler getLogHandler() {
return logHandler;
}

/**
* Gets the stats collector for the client.
* @return A log handler instance.
*/
public StatsCollector getStatsCollector() {
return statsCollector;
}

/**
* Creates a new builder for FaunaConfig.
*
Expand All @@ -100,6 +110,7 @@ public static class Builder {
private int maxContentionRetries = 3;
private Duration clientTimeoutBuffer = Duration.ofSeconds(5);
private Handler logHandler = defaultLogHandler();
private StatsCollector statsCollector;

static Level getLogLevel(String debug) {
if (debug == null || debug.isBlank()) {
Expand Down Expand Up @@ -170,6 +181,25 @@ public Builder logHandler(Handler handler) {
return this;
}

/**
* Set a StatsCollector.
* @param statsCollector A stats collector instance.
* @return The current Builder instance.
*/
public Builder statsCollector(StatsCollector statsCollector) {
this.statsCollector = statsCollector;
return this;
}

/**
* Set a default StatsCollector.
* @return The current Builder instance.
*/
public Builder defaultStatsCollector() {
this.statsCollector = new StatsCollectorImpl();
return this;
}

/**
* Builds and returns a new FaunaConfig instance.
*
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/fauna/client/FaunaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
Expand All @@ -25,8 +26,10 @@ public class FaunaStream<E> extends SubmissionPublisher<StreamEvent<E>> implemen
private Subscription subscription;
private Subscriber<? super StreamEvent<E>> eventSubscriber;
private MultiByteBufferInputStream buffer = null;
private final StatsCollector statsCollector;

public FaunaStream(Class<E> elementClass) {
public FaunaStream(Class<E> elementClass, StatsCollector statsCollector) {
this.statsCollector = statsCollector;
this.dataCodec = DefaultCodecProvider.SINGLETON.get(elementClass);
}

Expand Down Expand Up @@ -60,6 +63,11 @@ public void onNext(List<ByteBuffer> buffers) {
try {
JsonParser parser = JSON_FACTORY.createParser(buffer);
StreamEvent<E> event = StreamEvent.parse(parser, dataCodec);

if (statsCollector != null) {
statsCollector.add(event.getStats());
}

if (event.getType() == StreamEvent.EventType.ERROR) {
ErrorInfo error = event.getError();
this.onComplete();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/fauna/client/ScopedFaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class ScopedFaunaClient extends FaunaClient {


public ScopedFaunaClient(FaunaClient client, FaunaScope scope) {
super(client.getFaunaSecret(), client.getLogger());
super(client.getFaunaSecret(), client.getLogger(), client.getStatsCollector().orElse(null));
this.client = client;
this.requestBuilder = client.getRequestBuilder().scopedRequestBuilder(scope.getToken(client.getFaunaSecret()));
this.streamRequestBuilder = client.getStreamRequestBuilder().scopedRequestBuilder(scope.getToken(client.getFaunaSecret()));
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/fauna/response/QueryResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fauna.client.StatsCollector;
import com.fauna.codec.Codec;
import com.fauna.codec.UTF8FaunaParser;
import com.fauna.exception.ClientResponseException;
Expand Down Expand Up @@ -135,7 +136,7 @@ private static <T> Builder<T> handleField(Builder<T> builder, JsonParser parser)
}
}

public static <T> QuerySuccess<T> parseResponse(HttpResponse<InputStream> response, Codec<T> codec) throws FaunaException {
public static <T> QuerySuccess<T> parseResponse(HttpResponse<InputStream> response, Codec<T> codec, StatsCollector statsCollector) throws FaunaException {
try {
JsonParser parser = JSON_FACTORY.createParser(response.body());

Expand All @@ -147,6 +148,11 @@ public static <T> QuerySuccess<T> parseResponse(HttpResponse<InputStream> respo
while (parser.nextToken() == JsonToken.FIELD_NAME) {
builder = handleField(builder, parser);
}

if (statsCollector != null && builder.stats != null) {
statsCollector.add(builder.stats);
}

int httpStatus = response.statusCode();
if (httpStatus >= 400) {
QueryFailure failure = new QueryFailure(httpStatus, builder);
Expand Down
15 changes: 7 additions & 8 deletions src/test/java/com/fauna/client/FaunaClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,12 @@ static HttpResponse<InputStream> mockResponse(String body) {
void defaultClient() {
FaunaClient client = Fauna.client();
assertTrue(client.getHttpClient().connectTimeout().isEmpty());
assertTrue(client.getStatsCollector().isEmpty());
assertEquals(URI.create("https://db.fauna.com/query/1"),
client.getRequestBuilder().buildRequest(
fql("hello"), QueryOptions.builder().build(), DefaultCodecProvider.SINGLETON, 1L).uri());
}

@Test
void defaultConfigBuilder() {
FaunaConfig config = FaunaConfig.builder().build();
assertEquals("https://db.fauna.com", config.getEndpoint());
assertEquals("", config.getSecret());
}

@Test
void customConfigBuilder() {
FaunaConfig config = FaunaConfig.builder()
Expand All @@ -108,8 +102,13 @@ void customConfigBuilder() {

@Test
void customConfigConstructor() {
FaunaClient client = Fauna.client(FaunaConfig.builder().secret("foo").build());
FaunaConfig cfg = FaunaConfig.builder()
.secret("foo")
.defaultStatsCollector()
.build();
FaunaClient client = Fauna.client();
assertTrue(client.toString().startsWith("com.fauna.client.BaseFaunaClient"));
assertTrue(client.getStatsCollector().isPresent());
}

@Test
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/com/fauna/client/FaunaConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class FaunaConfigTest {
Expand All @@ -21,6 +23,7 @@ public void testDefaultFaunaConfig() {
assertEquals(Level.WARNING, config.getLogHandler().getLevel());
assertEquals("", config.getSecret());
assertEquals(3, config.getMaxContentionRetries());
assertNull(config.getStatsCollector());
}

@Test
Expand All @@ -35,11 +38,13 @@ public void testOverridingDefaultFaunaConfig() {
.logHandler(handler)
.maxContentionRetries(1)
.clientTimeoutBuffer(Duration.ofSeconds(1))
.defaultStatsCollector()
.build();
assertEquals("endpoint", config.getEndpoint());
assertEquals(Level.ALL, config.getLogHandler().getLevel());
assertEquals("foo", config.getSecret());
assertEquals(1, config.getMaxContentionRetries());
assertNotNull(config.getStatsCollector());
}

@ParameterizedTest
Expand Down
35 changes: 35 additions & 0 deletions src/test/java/com/fauna/e2e/E2EPaginationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.client.PageIterator;
import com.fauna.e2e.beans.Product;
import com.fauna.response.QuerySuccess;
Expand Down Expand Up @@ -111,4 +112,38 @@ public void query_all_flattened() {
}
assertEquals(50, products.size());
}

@Test
public void query_statsAreTrackedForExplicitPagination() {
var cfg = FaunaConfig.builder()
.secret("secret")
.endpoint("http://localhost:8443")
.defaultStatsCollector()
.build();
var client = Fauna.client(cfg);
PageIterator<Product> iter = client.paginate(fql("Product.all()"), Product.class);
iter.forEachRemaining(page -> {});

var stats = client.getStatsCollector().get().read();
assertEquals(82, stats.getReadOps());
assertEquals(4, stats.getComputeOps());
}

@Test
public void query_statsAreTrackedForFlattenedPagination() {
var cfg = FaunaConfig.builder()
.secret("secret")
.endpoint("http://localhost:8443")
.defaultStatsCollector()
.build();
var client = Fauna.client(cfg);
PageIterator<Product> iter = client.paginate(fql("Product.all()"), Product.class);
Iterator<Product> productIter = iter.flatten();
for (Product p : (Iterable<Product>) () -> productIter) {
}

var stats = client.getStatsCollector().get().read();
assertEquals(82, stats.getReadOps());
assertEquals(4, stats.getComputeOps());
}
}
34 changes: 34 additions & 0 deletions src/test/java/com/fauna/e2e/E2EQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.e2e.beans.Author;
import com.fauna.exception.AbortException;
import com.fauna.exception.QueryRuntimeException;
Expand Down Expand Up @@ -277,4 +278,37 @@ public void query_withTags() {
assertEquals("1", success.getQueryTags().get("first"));
assertEquals("2", success.getQueryTags().get("second"));
}

@Test
public void query_trackStatsOnSuccess() {
var cfg = FaunaConfig.builder()
.secret("secret")
.endpoint("http://localhost:8443")
.defaultStatsCollector()
.build();
var client = Fauna.client(cfg);

var q = fql("Author.all().toArray()");

client.query(q, listOf(Author.class));
var stats = client.getStatsCollector().get().read();
assertEquals(10, stats.getReadOps());
assertEquals(1, stats.getComputeOps());
}

@Test
public void query_trackStatsOnFailure() throws IOException {
var cfg = FaunaConfig.builder()
.secret("secret")
.endpoint("http://localhost:8443")
.defaultStatsCollector()
.build();
var client = Fauna.client(cfg);

var q = fql("Author.all().toArray()\nabort(null)");
assertThrows(AbortException.class, () -> client.query(q));
var stats = client.getStatsCollector().get().read();
assertEquals(8, stats.getReadOps());
assertEquals(1, stats.getComputeOps());
}
}
Loading

0 comments on commit c861aad

Please sign in to comment.