Skip to content

Commit

Permalink
PR feedback:
Browse files Browse the repository at this point in the history
  * Refactor FeedRequest into EventSource, and FeedOptions.
  * Rename StreamTokenResponse -> EventSourceResponse (and codec).
  * Rename FeedSuccess -> FeedPage.
  * Add E2EFeedsTest.manualTest to show how customers can control
    the requests being made.
  * Rename some of the methods on FaunaClient.
  • Loading branch information
David Griffin committed Oct 30, 2024
1 parent 1d96ef3 commit 144fd79
Show file tree
Hide file tree
Showing 17 changed files with 332 additions and 211 deletions.
51 changes: 26 additions & 25 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import com.fauna.exception.FaunaException;
import com.fauna.exception.ProtocolException;
import com.fauna.exception.ServiceException;
import com.fauna.feed.FeedRequest;
import com.fauna.feed.FeedSuccess;
import com.fauna.feed.EventSource;
import com.fauna.feed.FeedOptions;
import com.fauna.feed.FeedPage;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.response.QueryFailure;
import com.fauna.stream.StreamRequest;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.EventSourceResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QueryResponse;
import com.fauna.response.QuerySuccess;
Expand Down Expand Up @@ -108,10 +109,9 @@ private <T> void completeRequest(QuerySuccess<T> success, Throwable throwable) {
}
}

private <E> void completeFeedRequest(FeedSuccess<E> success, Throwable throwable) {
if (success != null) {
// TODO: Update TS?
} else if (throwable != null) {
private <E> void completeFeedRequest(FeedPage<E> success, Throwable throwable) {
// Feeds do not update the clients latest transaction timestamp.
if (throwable != null) {
extractServiceException(throwable).ifPresent(exc -> updateTs(exc.getResponse()));
}
}
Expand All @@ -130,7 +130,7 @@ private <T> Supplier<CompletableFuture<QuerySuccess<T>>> makeAsyncRequest(HttpCl
}).whenComplete(this::completeRequest);
}

private <E> Supplier<CompletableFuture<FeedSuccess<E>>> makeAsyncFeedRequest(HttpClient client, HttpRequest request, Codec<E> codec) {
private <E> Supplier<CompletableFuture<FeedPage<E>>> makeAsyncFeedRequest(HttpClient client, HttpRequest request, Codec<E> codec) {
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(
response -> {
logResponse(response);
Expand All @@ -143,7 +143,7 @@ private <E> Supplier<CompletableFuture<FeedSuccess<E>>> makeAsyncFeedRequest(Htt
// Fall back on ProtocolException.
throw new ProtocolException(response.statusCode(), failure);
}
return FeedSuccess.parseResponse(response, codec);
return FeedPage.parseResponse(response, codec);
}).whenComplete(this::completeFeedRequest);
}

Expand Down Expand Up @@ -467,7 +467,7 @@ public <E> FaunaStream<E> stream(StreamRequest streamRequest, Class<E> elementCl
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> elementClass) {
return this.asyncQuery(fql, StreamTokenResponse.class).thenApply(
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(
queryResponse -> this.stream(StreamRequest.fromTokenResponse(queryResponse.getData()), elementClass));
}

Expand All @@ -494,53 +494,54 @@ public <E> FaunaStream<E> stream(Query fql, Class<E> elementClass) {
//region Event Feeds

/**
* Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the response.
* @param feedRequest The Feed Request object.
* Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the feed page.
* @param source An EventSource object with the feed token.
* @param feedOptions The FeedOptions object.
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedSuccess A CompletableFuture that completes with the successful feed response.
* @param <E> The type of the feed events.
*/
public <E> CompletableFuture<FeedSuccess<E>> asyncFeed(FeedRequest feedRequest, Class<E> elementClass) {
return new RetryHandler<FeedSuccess<E>>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest(
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(feedRequest), codecProvider.get(elementClass)));
public <E> CompletableFuture<FeedPage<E>> feedPage(EventSource source, FeedOptions feedOptions, Class<E> elementClass) {
return new RetryHandler<FeedPage<E>>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest(
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(source, feedOptions), codecProvider.get(elementClass)));
}

/**
* Return a CompletableFuture that completes with a FeedIterator based on an FQL query. This method sends two
* requests, one to the query endpoint to get the stream/feed token, and then another request to the feed endpoint
* requests, one to the query endpoint to get the event source token, and then another request to the feed endpoint
* to get the first page of results.
* @param fql The FQL query to be executed. It must return a token, e.g. ends in `.changesOn()`.
* @param startTs Stream start time in microseconds since the Unix epoch.
* @param feedOptions The FeedOptions object (must not be null).
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedIterator A CompletableFuture that completes with a feed iterator that returns pages of Feed events.
* @param <E> The type of the feed events.
*/
public <E> CompletableFuture<FeedIterator<E>> asyncFeed(Query fql, long startTs, Class<E> elementClass) {
return this.asyncQuery(fql, StreamTokenResponse.class).thenApply(str -> this.feed(FeedRequest.builder(str.getData().getToken()).startTs(startTs).build(), elementClass));
public <E> CompletableFuture<FeedIterator<E>> asyncFeed(Query fql, FeedOptions feedOptions, Class<E> elementClass) {
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(success -> this.feed(EventSource.fromResponse(success.getData()), feedOptions, elementClass));
}

/**
* Return a FeedIterator based on an FQL query. This method sends two requests, one to the query endpoint to get
* the stream/feed token, and then another request to the feed endpoint to get the first page of results.
* @param fql The FQL query to be executed. It must return a token, e.g. ends in `.changesOn()`.
* @param startTs Stream start time in microseconds since the Unix epoch.
* @param feedOptions The Feed Op
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedIterator An iterator that returns pages of Feed events.
* @param <E> The type of the feed events.
*/
public <E> FeedIterator<E> feed(Query fql, long startTs, Class<E> elementClass) {
return completeAsync(asyncFeed(fql, startTs, elementClass), FEED_SUBSCRIPTION);
public <E> FeedIterator<E> feed(Query fql, FeedOptions feedOptions, Class<E> elementClass) {
return completeAsync(asyncFeed(fql, feedOptions, elementClass), FEED_SUBSCRIPTION);
}

/**
* Send a request to the Feed endpoint and return a FeedIterator.
* @param request The Feed Request object.
* @param eventSource The Fauna Event Source.
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedIterator An iterator that returns pages of Feed events.
* @param <E> The type of the feed events.
*/
public <E> FeedIterator<E> feed(FeedRequest request, Class<E> elementClass) {
return new FeedIterator<>(this, request, elementClass);
public <E> FeedIterator<E> feed(EventSource eventSource, FeedOptions feedOptions, Class<E> elementClass) {
return new FeedIterator<>(this, eventSource, feedOptions, elementClass);
}

//endregion
Expand Down
41 changes: 24 additions & 17 deletions src/main/java/com/fauna/client/FeedIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@


import com.fauna.exception.FaunaException;
import com.fauna.feed.FeedRequest;
import com.fauna.feed.FeedSuccess;
import com.fauna.feed.EventSource;
import com.fauna.feed.FeedOptions;
import com.fauna.feed.FeedPage;
import com.fauna.response.StreamEvent;

import java.util.Iterator;
Expand All @@ -15,23 +16,27 @@
* FeedIterator iterates over Event Feed pages from Fauna.
* @param <E>
*/
public class FeedIterator<E> implements Iterator<FeedSuccess<E>> {
public class FeedIterator<E> implements Iterator<FeedPage<E>> {
private final FaunaClient client;
private final Class<E> resultClass;
private final FeedRequest initialRequest;
private CompletableFuture<FeedSuccess<E>> feedFuture;
private final EventSource eventSource;
private FeedOptions latestOptions;
private CompletableFuture<FeedPage<E>> feedFuture;

/**
* Construct a new PageIterator.
*
* @param client A client that makes requests to Fauna.
* @param request The Feed Request object.
* @param eventSource The Fauna Event Source.
* @param feedOptions The FeedOptions object.
* @param resultClass The class of the elements returned from Fauna (i.e. the rows).
*/
public FeedIterator(FaunaClient client, FeedRequest request, Class<E> resultClass) {
public FeedIterator(FaunaClient client, EventSource eventSource, FeedOptions feedOptions, Class<E> resultClass) {
this.client = client;
this.resultClass = resultClass;
this.initialRequest = request;
this.feedFuture = client.asyncFeed(request, resultClass);
this.eventSource = eventSource;
this.latestOptions = feedOptions;
this.feedFuture = client.feedPage(eventSource, feedOptions, resultClass);
}

@Override
Expand All @@ -41,15 +46,17 @@ public boolean hasNext() {

/**
* Returns a CompletableFuture that will complete with the next page (or throw a FaunaException).
* @return A c
* When the future completes, the next page will be fetched in the background.
*
* @return A CompletableFuture that completes with a new FeedPage instance.
*/
public CompletableFuture<FeedSuccess<E>> nextAsync() {
public CompletableFuture<FeedPage<E>> nextAsync() {
if (this.feedFuture != null) {
return this.feedFuture.thenApply(fs -> {
if (fs.hasNext()) {
FeedRequest.Builder builder = FeedRequest.builder(initialRequest.getToken()).cursor(fs.getCursor());
initialRequest.getPageSize().ifPresent(builder::pageSize);
this.feedFuture = client.asyncFeed(builder.build(), resultClass);
FeedOptions options = this.latestOptions.nextPage(fs);
this.latestOptions = options;
this.feedFuture = client.feedPage(this.eventSource, options, resultClass);
} else {
this.feedFuture = null;
}
Expand All @@ -62,12 +69,12 @@ public CompletableFuture<FeedSuccess<E>> nextAsync() {


/**
* Get the next Page.
* @return The next Page of elements E.
* Get the next Page (synchronously).
* @return FeedPage The next Page of elements E.
* @throws FaunaException If there is an error getting the next page.
*/
@Override
public FeedSuccess<E> next() {
public FeedPage<E> next() {
try {
return nextAsync().join();
} catch (CompletionException ce) {
Expand Down
43 changes: 12 additions & 31 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.fauna.codec.CodecProvider;
import com.fauna.env.DriverEnvironment;
import com.fauna.exception.ClientException;
import com.fauna.feed.EventSource;
import com.fauna.feed.FeedOptions;
import com.fauna.feed.FeedRequest;
import com.fauna.query.QueryOptions;
import com.fauna.stream.StreamRequest;
Expand Down Expand Up @@ -37,12 +39,12 @@ public class RequestBuilder {
private final Duration clientTimeoutBuffer;
private final Logger logger;

static class FieldNames {
public static class FieldNames {
static final String QUERY = "query";
static final String TOKEN = "token";
static final String CURSOR = "cursor";
static final String START_TS = "start_ts";
static final String PAGE_SIZE = "page_size";
public static final String TOKEN = "token";
public static final String CURSOR = "cursor";
public static final String START_TS = "start_ts";
public static final String PAGE_SIZE = "page_size";
}

static class Headers {
Expand Down Expand Up @@ -156,28 +158,6 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException {
return requestBytes.toString(StandardCharsets.UTF_8);
}

public String buildFeedRequestBody(FeedRequest request) throws IOException {
// Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example,
// start_ts is a JSON numeric/integer, not a tagged '@long'.
ByteArrayOutputStream requestBytes = new ByteArrayOutputStream();
JsonGenerator gen = new JsonFactory().createGenerator(requestBytes);
gen.writeStartObject();
gen.writeStringField(FieldNames.TOKEN, request.getToken());
// Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException.
if (request.getCursor().isPresent()) {
gen.writeStringField(FieldNames.CURSOR, request.getCursor().get());
}
if (request.getStartTs().isPresent()) {
gen.writeNumberField(FieldNames.START_TS, request.getStartTs().get());
}
if (request.getPageSize().isPresent()) {
gen.writeNumberField(FieldNames.PAGE_SIZE, request.getPageSize().get());
}
gen.writeEndObject();
gen.flush();
return requestBytes.toString(StandardCharsets.UTF_8);
}

public HttpRequest buildStreamRequest(StreamRequest request) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
Expand All @@ -191,15 +171,16 @@ public HttpRequest buildStreamRequest(StreamRequest request) {
}
}

public HttpRequest buildFeedRequest(FeedRequest request) {
public HttpRequest buildFeedRequest(EventSource eventSource, FeedOptions options) {
FeedRequest request = new FeedRequest(eventSource, options);
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(val -> {
options.getTimeout().ifPresent(val -> {
builder.timeout(val.plus(clientTimeoutBuffer));
builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val.toMillis()));
});
try {
String body = buildFeedRequestBody(request);
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
String body = request.serialize();
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(request.serialize())).build();
logRequest(body, req);
return req;
} catch (IOException e) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/fauna/codec/DefaultCodecProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import com.fauna.codec.codecs.QueryCodec;
import com.fauna.codec.codecs.QueryLiteralCodec;
import com.fauna.codec.codecs.QueryObjCodec;
import com.fauna.codec.codecs.StreamTokenResponseCodec;
import com.fauna.query.StreamTokenResponse;
import com.fauna.codec.codecs.EventSourceResponseCodec;
import com.fauna.query.EventSourceResponse;
import com.fauna.codec.codecs.QueryValCodec;
import com.fauna.query.builder.Query;
import com.fauna.query.builder.QueryArr;
Expand Down Expand Up @@ -47,7 +47,7 @@ public DefaultCodecProvider(CodecRegistry registry) {
registry.put(CodecRegistryKey.from(QueryVal.class), new QueryValCodec(this));
registry.put(CodecRegistryKey.from(QueryLiteral.class), new QueryLiteralCodec());

registry.put(CodecRegistryKey.from(StreamTokenResponse.class), new StreamTokenResponseCodec());
registry.put(CodecRegistryKey.from(EventSourceResponse.class), new EventSourceResponseCodec());


var bdc = new BaseDocumentCodec(this);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/fauna/codec/codecs/DynamicCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.fauna.codec.UTF8FaunaGenerator;
import com.fauna.codec.UTF8FaunaParser;
import com.fauna.exception.CodecException;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.EventSourceResponse;
import com.fauna.types.Document;
import com.fauna.types.DocumentRef;
import com.fauna.types.Page;
Expand Down Expand Up @@ -43,7 +43,7 @@ public Object decode(UTF8FaunaParser parser) throws CodecException {
case START_DOCUMENT:
return provider.get(Document.class).decode(parser);
case STREAM:
return provider.get(StreamTokenResponse.class).decode(parser);
return provider.get(EventSourceResponse.class).decode(parser);
case MODULE:
return parser.getValueAsModule();
case INT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@
import com.fauna.codec.UTF8FaunaParser;
import com.fauna.codec.FaunaTokenType;
import com.fauna.exception.CodecException;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.EventSourceResponse;

public class StreamTokenResponseCodec extends BaseCodec<StreamTokenResponse> {
public class EventSourceResponseCodec extends BaseCodec<EventSourceResponse> {

@Override
public StreamTokenResponse decode(UTF8FaunaParser parser) throws CodecException {
public EventSourceResponse decode(UTF8FaunaParser parser) throws CodecException {
if (parser.getCurrentTokenType() == FaunaTokenType.STREAM) {
return new StreamTokenResponse(parser.getTaggedValueAsString());
return new EventSourceResponse(parser.getTaggedValueAsString());
} else {
throw new CodecException(this.unsupportedTypeDecodingMessage(parser.getCurrentTokenType().getFaunaType(), getSupportedTypes()));
}
}

@Override
public void encode(UTF8FaunaGenerator gen, StreamTokenResponse obj) throws CodecException {
public void encode(UTF8FaunaGenerator gen, EventSourceResponse obj) throws CodecException {
throw new CodecException("Cannot encode StreamTokenResponse");

}

@Override
public Class<?> getCodecClass() {
return StreamTokenResponse.class;
return EventSourceResponse.class;
}

@Override
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/fauna/feed/EventSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.fauna.feed;

import com.fauna.query.EventSourceResponse;

public class EventSource {
private final String token;

public EventSource(String token) {
this.token = token;
}

public String getToken() {
return token;
}

public static EventSource fromToken(String token) {
return new EventSource(token);
}

public static EventSource fromResponse(EventSourceResponse response) {
return new EventSource(response.getToken());
}
}
Loading

0 comments on commit 144fd79

Please sign in to comment.