diff --git a/src/main/java/com/fauna/client/FaunaClient.java b/src/main/java/com/fauna/client/FaunaClient.java index 2e5dd4f2..c79a7c58 100644 --- a/src/main/java/com/fauna/client/FaunaClient.java +++ b/src/main/java/com/fauna/client/FaunaClient.java @@ -9,13 +9,14 @@ import com.fauna.exception.FaunaException; import com.fauna.exception.ProtocolException; import com.fauna.exception.ServiceException; -import com.fauna.feed.FeedRequest; -import com.fauna.feed.FeedSuccess; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; +import com.fauna.feed.FeedPage; import com.fauna.query.AfterToken; import com.fauna.query.QueryOptions; import com.fauna.response.QueryFailure; import com.fauna.stream.StreamRequest; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; import com.fauna.query.builder.Query; import com.fauna.response.QueryResponse; import com.fauna.response.QuerySuccess; @@ -108,10 +109,9 @@ private void completeRequest(QuerySuccess success, Throwable throwable) { } } - private void completeFeedRequest(FeedSuccess success, Throwable throwable) { - if (success != null) { - // TODO: Update TS? - } else if (throwable != null) { + private void completeFeedRequest(FeedPage success, Throwable throwable) { + // Feeds do not update the clients latest transaction timestamp. + if (throwable != null) { extractServiceException(throwable).ifPresent(exc -> updateTs(exc.getResponse())); } } @@ -130,7 +130,7 @@ private Supplier>> makeAsyncRequest(HttpCl }).whenComplete(this::completeRequest); } - private Supplier>> makeAsyncFeedRequest(HttpClient client, HttpRequest request, Codec codec) { + private Supplier>> makeAsyncFeedRequest(HttpClient client, HttpRequest request, Codec codec) { return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply( response -> { logResponse(response); @@ -143,7 +143,7 @@ private Supplier>> makeAsyncFeedRequest(Htt // Fall back on ProtocolException. throw new ProtocolException(response.statusCode(), failure); } - return FeedSuccess.parseResponse(response, codec); + return FeedPage.parseResponse(response, codec); }).whenComplete(this::completeFeedRequest); } @@ -467,7 +467,7 @@ public FaunaStream stream(StreamRequest streamRequest, Class elementCl * @throws FaunaException If the query does not succeed, an exception will be thrown. */ public CompletableFuture> asyncStream(Query fql, Class elementClass) { - return this.asyncQuery(fql, StreamTokenResponse.class).thenApply( + return this.asyncQuery(fql, EventSourceResponse.class).thenApply( queryResponse -> this.stream(StreamRequest.fromTokenResponse(queryResponse.getData()), elementClass)); } @@ -494,53 +494,54 @@ public FaunaStream stream(Query fql, Class elementClass) { //region Event Feeds /** - * Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the response. - * @param feedRequest The Feed Request object. + * Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the feed page. + * @param source An EventSource object with the feed token. + * @param feedOptions The FeedOptions object. * @param elementClass The expected class <E> of the feed events. * @return FeedSuccess A CompletableFuture that completes with the successful feed response. * @param The type of the feed events. */ - public CompletableFuture> asyncFeed(FeedRequest feedRequest, Class elementClass) { - return new RetryHandler>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest( - getHttpClient(), getFeedRequestBuilder().buildFeedRequest(feedRequest), codecProvider.get(elementClass))); + public CompletableFuture> feedPage(EventSource source, FeedOptions feedOptions, Class elementClass) { + return new RetryHandler>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest( + getHttpClient(), getFeedRequestBuilder().buildFeedRequest(source, feedOptions), codecProvider.get(elementClass))); } /** * Return a CompletableFuture that completes with a FeedIterator based on an FQL query. This method sends two - * requests, one to the query endpoint to get the stream/feed token, and then another request to the feed endpoint + * requests, one to the query endpoint to get the event source token, and then another request to the feed endpoint * to get the first page of results. * @param fql The FQL query to be executed. It must return a token, e.g. ends in `.changesOn()`. - * @param startTs Stream start time in microseconds since the Unix epoch. + * @param feedOptions The FeedOptions object (must not be null). * @param elementClass The expected class <E> of the feed events. * @return FeedIterator A CompletableFuture that completes with a feed iterator that returns pages of Feed events. * @param The type of the feed events. */ - public CompletableFuture> asyncFeed(Query fql, long startTs, Class elementClass) { - return this.asyncQuery(fql, StreamTokenResponse.class).thenApply(str -> this.feed(FeedRequest.builder(str.getData().getToken()).startTs(startTs).build(), elementClass)); + public CompletableFuture> asyncFeed(Query fql, FeedOptions feedOptions, Class elementClass) { + return this.asyncQuery(fql, EventSourceResponse.class).thenApply(success -> this.feed(EventSource.fromResponse(success.getData()), feedOptions, elementClass)); } /** * Return a FeedIterator based on an FQL query. This method sends two requests, one to the query endpoint to get * the stream/feed token, and then another request to the feed endpoint to get the first page of results. * @param fql The FQL query to be executed. It must return a token, e.g. ends in `.changesOn()`. - * @param startTs Stream start time in microseconds since the Unix epoch. + * @param feedOptions The Feed Op * @param elementClass The expected class <E> of the feed events. * @return FeedIterator An iterator that returns pages of Feed events. * @param The type of the feed events. */ - public FeedIterator feed(Query fql, long startTs, Class elementClass) { - return completeAsync(asyncFeed(fql, startTs, elementClass), FEED_SUBSCRIPTION); + public FeedIterator feed(Query fql, FeedOptions feedOptions, Class elementClass) { + return completeAsync(asyncFeed(fql, feedOptions, elementClass), FEED_SUBSCRIPTION); } /** * Send a request to the Feed endpoint and return a FeedIterator. - * @param request The Feed Request object. + * @param eventSource The Fauna Event Source. * @param elementClass The expected class <E> of the feed events. * @return FeedIterator An iterator that returns pages of Feed events. * @param The type of the feed events. */ - public FeedIterator feed(FeedRequest request, Class elementClass) { - return new FeedIterator<>(this, request, elementClass); + public FeedIterator feed(EventSource eventSource, FeedOptions feedOptions, Class elementClass) { + return new FeedIterator<>(this, eventSource, feedOptions, elementClass); } //endregion diff --git a/src/main/java/com/fauna/client/FeedIterator.java b/src/main/java/com/fauna/client/FeedIterator.java index 32026180..e397a03d 100644 --- a/src/main/java/com/fauna/client/FeedIterator.java +++ b/src/main/java/com/fauna/client/FeedIterator.java @@ -2,8 +2,9 @@ import com.fauna.exception.FaunaException; -import com.fauna.feed.FeedRequest; -import com.fauna.feed.FeedSuccess; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; +import com.fauna.feed.FeedPage; import com.fauna.response.StreamEvent; import java.util.Iterator; @@ -15,23 +16,27 @@ * FeedIterator iterates over Event Feed pages from Fauna. * @param */ -public class FeedIterator implements Iterator> { +public class FeedIterator implements Iterator> { private final FaunaClient client; private final Class resultClass; - private final FeedRequest initialRequest; - private CompletableFuture> feedFuture; + private final EventSource eventSource; + private FeedOptions latestOptions; + private CompletableFuture> feedFuture; /** * Construct a new PageIterator. + * * @param client A client that makes requests to Fauna. - * @param request The Feed Request object. + * @param eventSource The Fauna Event Source. + * @param feedOptions The FeedOptions object. * @param resultClass The class of the elements returned from Fauna (i.e. the rows). */ - public FeedIterator(FaunaClient client, FeedRequest request, Class resultClass) { + public FeedIterator(FaunaClient client, EventSource eventSource, FeedOptions feedOptions, Class resultClass) { this.client = client; this.resultClass = resultClass; - this.initialRequest = request; - this.feedFuture = client.asyncFeed(request, resultClass); + this.eventSource = eventSource; + this.latestOptions = feedOptions; + this.feedFuture = client.feedPage(eventSource, feedOptions, resultClass); } @Override @@ -41,15 +46,17 @@ public boolean hasNext() { /** * Returns a CompletableFuture that will complete with the next page (or throw a FaunaException). - * @return A c + * When the future completes, the next page will be fetched in the background. + * + * @return A CompletableFuture that completes with a new FeedPage instance. */ - public CompletableFuture> nextAsync() { + public CompletableFuture> nextAsync() { if (this.feedFuture != null) { return this.feedFuture.thenApply(fs -> { if (fs.hasNext()) { - FeedRequest.Builder builder = FeedRequest.builder(initialRequest.getToken()).cursor(fs.getCursor()); - initialRequest.getPageSize().ifPresent(builder::pageSize); - this.feedFuture = client.asyncFeed(builder.build(), resultClass); + FeedOptions options = this.latestOptions.nextPage(fs); + this.latestOptions = options; + this.feedFuture = client.feedPage(this.eventSource, options, resultClass); } else { this.feedFuture = null; } @@ -62,12 +69,12 @@ public CompletableFuture> nextAsync() { /** - * Get the next Page. - * @return The next Page of elements E. + * Get the next Page (synchronously). + * @return FeedPage The next Page of elements E. * @throws FaunaException If there is an error getting the next page. */ @Override - public FeedSuccess next() { + public FeedPage next() { try { return nextAsync().join(); } catch (CompletionException ce) { diff --git a/src/main/java/com/fauna/client/RequestBuilder.java b/src/main/java/com/fauna/client/RequestBuilder.java index 80796ca9..32b03f13 100644 --- a/src/main/java/com/fauna/client/RequestBuilder.java +++ b/src/main/java/com/fauna/client/RequestBuilder.java @@ -6,6 +6,8 @@ import com.fauna.codec.CodecProvider; import com.fauna.env.DriverEnvironment; import com.fauna.exception.ClientException; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; import com.fauna.feed.FeedRequest; import com.fauna.query.QueryOptions; import com.fauna.stream.StreamRequest; @@ -37,12 +39,12 @@ public class RequestBuilder { private final Duration clientTimeoutBuffer; private final Logger logger; - static class FieldNames { + public static class FieldNames { static final String QUERY = "query"; - static final String TOKEN = "token"; - static final String CURSOR = "cursor"; - static final String START_TS = "start_ts"; - static final String PAGE_SIZE = "page_size"; + public static final String TOKEN = "token"; + public static final String CURSOR = "cursor"; + public static final String START_TS = "start_ts"; + public static final String PAGE_SIZE = "page_size"; } static class Headers { @@ -156,28 +158,6 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException { return requestBytes.toString(StandardCharsets.UTF_8); } - public String buildFeedRequestBody(FeedRequest request) throws IOException { - // Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example, - // start_ts is a JSON numeric/integer, not a tagged '@long'. - ByteArrayOutputStream requestBytes = new ByteArrayOutputStream(); - JsonGenerator gen = new JsonFactory().createGenerator(requestBytes); - gen.writeStartObject(); - gen.writeStringField(FieldNames.TOKEN, request.getToken()); - // Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException. - if (request.getCursor().isPresent()) { - gen.writeStringField(FieldNames.CURSOR, request.getCursor().get()); - } - if (request.getStartTs().isPresent()) { - gen.writeNumberField(FieldNames.START_TS, request.getStartTs().get()); - } - if (request.getPageSize().isPresent()) { - gen.writeNumberField(FieldNames.PAGE_SIZE, request.getPageSize().get()); - } - gen.writeEndObject(); - gen.flush(); - return requestBytes.toString(StandardCharsets.UTF_8); - } - public HttpRequest buildStreamRequest(StreamRequest request) { HttpRequest.Builder builder = baseRequestBuilder.copy(); request.getTimeout().ifPresent(builder::timeout); @@ -191,15 +171,16 @@ public HttpRequest buildStreamRequest(StreamRequest request) { } } - public HttpRequest buildFeedRequest(FeedRequest request) { + public HttpRequest buildFeedRequest(EventSource eventSource, FeedOptions options) { + FeedRequest request = new FeedRequest(eventSource, options); HttpRequest.Builder builder = baseRequestBuilder.copy(); - request.getTimeout().ifPresent(val -> { + options.getTimeout().ifPresent(val -> { builder.timeout(val.plus(clientTimeoutBuffer)); builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val.toMillis())); }); try { - String body = buildFeedRequestBody(request); - HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build(); + String body = request.serialize(); + HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(request.serialize())).build(); logRequest(body, req); return req; } catch (IOException e) { diff --git a/src/main/java/com/fauna/codec/DefaultCodecProvider.java b/src/main/java/com/fauna/codec/DefaultCodecProvider.java index d7126bb8..88c69ebc 100644 --- a/src/main/java/com/fauna/codec/DefaultCodecProvider.java +++ b/src/main/java/com/fauna/codec/DefaultCodecProvider.java @@ -13,8 +13,8 @@ import com.fauna.codec.codecs.QueryCodec; import com.fauna.codec.codecs.QueryLiteralCodec; import com.fauna.codec.codecs.QueryObjCodec; -import com.fauna.codec.codecs.StreamTokenResponseCodec; -import com.fauna.query.StreamTokenResponse; +import com.fauna.codec.codecs.EventSourceResponseCodec; +import com.fauna.query.EventSourceResponse; import com.fauna.codec.codecs.QueryValCodec; import com.fauna.query.builder.Query; import com.fauna.query.builder.QueryArr; @@ -47,7 +47,7 @@ public DefaultCodecProvider(CodecRegistry registry) { registry.put(CodecRegistryKey.from(QueryVal.class), new QueryValCodec(this)); registry.put(CodecRegistryKey.from(QueryLiteral.class), new QueryLiteralCodec()); - registry.put(CodecRegistryKey.from(StreamTokenResponse.class), new StreamTokenResponseCodec()); + registry.put(CodecRegistryKey.from(EventSourceResponse.class), new EventSourceResponseCodec()); var bdc = new BaseDocumentCodec(this); diff --git a/src/main/java/com/fauna/codec/codecs/DynamicCodec.java b/src/main/java/com/fauna/codec/codecs/DynamicCodec.java index 4a5f6287..917003ee 100644 --- a/src/main/java/com/fauna/codec/codecs/DynamicCodec.java +++ b/src/main/java/com/fauna/codec/codecs/DynamicCodec.java @@ -6,7 +6,7 @@ import com.fauna.codec.UTF8FaunaGenerator; import com.fauna.codec.UTF8FaunaParser; import com.fauna.exception.CodecException; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; import com.fauna.types.Document; import com.fauna.types.DocumentRef; import com.fauna.types.Page; @@ -43,7 +43,7 @@ public Object decode(UTF8FaunaParser parser) throws CodecException { case START_DOCUMENT: return provider.get(Document.class).decode(parser); case STREAM: - return provider.get(StreamTokenResponse.class).decode(parser); + return provider.get(EventSourceResponse.class).decode(parser); case MODULE: return parser.getValueAsModule(); case INT: diff --git a/src/main/java/com/fauna/codec/codecs/StreamTokenResponseCodec.java b/src/main/java/com/fauna/codec/codecs/EventSourceResponseCodec.java similarity index 69% rename from src/main/java/com/fauna/codec/codecs/StreamTokenResponseCodec.java rename to src/main/java/com/fauna/codec/codecs/EventSourceResponseCodec.java index 5ac0425f..50f4334b 100644 --- a/src/main/java/com/fauna/codec/codecs/StreamTokenResponseCodec.java +++ b/src/main/java/com/fauna/codec/codecs/EventSourceResponseCodec.java @@ -5,28 +5,28 @@ import com.fauna.codec.UTF8FaunaParser; import com.fauna.codec.FaunaTokenType; import com.fauna.exception.CodecException; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; -public class StreamTokenResponseCodec extends BaseCodec { +public class EventSourceResponseCodec extends BaseCodec { @Override - public StreamTokenResponse decode(UTF8FaunaParser parser) throws CodecException { + public EventSourceResponse decode(UTF8FaunaParser parser) throws CodecException { if (parser.getCurrentTokenType() == FaunaTokenType.STREAM) { - return new StreamTokenResponse(parser.getTaggedValueAsString()); + return new EventSourceResponse(parser.getTaggedValueAsString()); } else { throw new CodecException(this.unsupportedTypeDecodingMessage(parser.getCurrentTokenType().getFaunaType(), getSupportedTypes())); } } @Override - public void encode(UTF8FaunaGenerator gen, StreamTokenResponse obj) throws CodecException { + public void encode(UTF8FaunaGenerator gen, EventSourceResponse obj) throws CodecException { throw new CodecException("Cannot encode StreamTokenResponse"); } @Override public Class getCodecClass() { - return StreamTokenResponse.class; + return EventSourceResponse.class; } @Override diff --git a/src/main/java/com/fauna/feed/EventSource.java b/src/main/java/com/fauna/feed/EventSource.java new file mode 100644 index 00000000..2fa4a3ce --- /dev/null +++ b/src/main/java/com/fauna/feed/EventSource.java @@ -0,0 +1,23 @@ +package com.fauna.feed; + +import com.fauna.query.EventSourceResponse; + +public class EventSource { + private final String token; + + public EventSource(String token) { + this.token = token; + } + + public String getToken() { + return token; + } + + public static EventSource fromToken(String token) { + return new EventSource(token); + } + + public static EventSource fromResponse(EventSourceResponse response) { + return new EventSource(response.getToken()); + } +} diff --git a/src/main/java/com/fauna/feed/FeedOptions.java b/src/main/java/com/fauna/feed/FeedOptions.java new file mode 100644 index 00000000..797e58c1 --- /dev/null +++ b/src/main/java/com/fauna/feed/FeedOptions.java @@ -0,0 +1,99 @@ +package com.fauna.feed; + +import com.fauna.query.QueryOptions; + +import java.time.Duration; +import java.util.Optional; + +public class FeedOptions { + private final String cursor; + private final Long startTs; + private final Integer pageSize; + private final Duration timeout; + + public static FeedOptions DEFAULT = FeedOptions.builder().build(); + + public FeedOptions(String cursor, Long startTs, Integer pageSize, Duration timeout) { + this.cursor = cursor; + this.startTs = startTs; + this.pageSize = pageSize; + this.timeout = timeout; + if (cursor != null && startTs != null) { + throw new IllegalArgumentException("Only one of cursor, and startTs can be set."); + } + } + + public Optional getCursor() { + return Optional.ofNullable(cursor); + } + + public Optional getStartTs() { + return Optional.ofNullable(startTs); + } + + public Optional getPageSize() { + return Optional.ofNullable(pageSize); + } + + public Optional getTimeout() { + return Optional.ofNullable(timeout); + } + + public static class Builder { + public String cursor = null; + public Long startTs = null; + public Integer pageSize = null; + public Duration timeout = QueryOptions.DEFAULT_TIMEOUT; + + public Builder cursor(String cursor) { + if (startTs != null) { + throw new IllegalArgumentException("Only one of cursor, and startTs can be set."); + } + this.cursor = cursor; + return this; + } + + public Builder startTs(Long startTs) { + if (cursor != null) { + throw new IllegalArgumentException("Only one of cursor, and startTs can be set."); + } + this.startTs = startTs; + return this; + } + + public Builder pageSize(Integer pageSize) { + this.pageSize = pageSize; + return this; + } + + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public FeedOptions build() { + return new FeedOptions(cursor, startTs, pageSize, timeout); + } + + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Return the FeedOptions for the next page, based on the cursor of the given page. + * + * This method copies options, like pageSize, and timeout. + * + * @param page The current, or latest page. + * @return A new FeedOptions instance. + */ + public FeedOptions nextPage(FeedPage page) { + FeedOptions.Builder builder = FeedOptions.builder().cursor(page.getCursor()); + // Do not set or copy startTs, because we are using cursor. + getPageSize().ifPresent(builder::pageSize); + getTimeout().ifPresent(builder::timeout); + return builder.build(); + } +} diff --git a/src/main/java/com/fauna/feed/FeedSuccess.java b/src/main/java/com/fauna/feed/FeedPage.java similarity index 87% rename from src/main/java/com/fauna/feed/FeedSuccess.java rename to src/main/java/com/fauna/feed/FeedPage.java index e6623a27..f22d9917 100644 --- a/src/main/java/com/fauna/feed/FeedSuccess.java +++ b/src/main/java/com/fauna/feed/FeedPage.java @@ -3,8 +3,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fauna.codec.Codec; -import com.fauna.codec.FaunaTokenType; -import com.fauna.codec.UTF8FaunaParser; import com.fauna.exception.ClientResponseException; import com.fauna.response.QueryStats; import com.fauna.response.StreamEvent; @@ -21,21 +19,20 @@ import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fauna.constants.ResponseFields.EVENTS_FIELD_NAME; import static com.fauna.constants.ResponseFields.FEED_HAS_NEXT_FIELD_NAME; -import static com.fauna.constants.ResponseFields.LAST_SEEN_TXN_FIELD_NAME; import static com.fauna.constants.ResponseFields.STATS_FIELD_NAME; import static com.fauna.constants.ResponseFields.CURSOR_FIELD_NAME; -public class FeedSuccess { +public class FeedPage { private final List> events; private final String cursor; private final boolean hasNext; private final QueryStats stats; private static final JsonFactory JSON_FACTORY = new JsonFactory(); - public FeedSuccess(final List> events, - final String cursor, - final boolean hasNext, - final QueryStats stats) { + public FeedPage(final List> events, + final String cursor, + final boolean hasNext, + final QueryStats stats) { this.events = events; this.cursor = cursor; this.hasNext = hasNext; @@ -108,8 +105,8 @@ public Builder parseEvents(JsonParser parser) throws IOException { return this; } - public FeedSuccess build() { - return new FeedSuccess<>(events, cursor, hasNext, stats); + public FeedPage build() { + return new FeedPage<>(events, cursor, hasNext, stats); } public Builder parseField(JsonParser parser) throws IOException { @@ -134,11 +131,11 @@ public static Builder builder(Codec elementCodec) { return new Builder<>(elementCodec); } - public static FeedSuccess parseResponse(HttpResponse response, Codec elementCodec) { + public static FeedPage parseResponse(HttpResponse response, Codec elementCodec) { try { JsonParser parser = JSON_FACTORY.createParser(response.body()); if (parser.nextToken() == START_OBJECT) { - Builder builder = FeedSuccess.builder(elementCodec); + Builder builder = FeedPage.builder(elementCodec); while (parser.nextToken() == FIELD_NAME) { builder.parseField(parser); } diff --git a/src/main/java/com/fauna/feed/FeedRequest.java b/src/main/java/com/fauna/feed/FeedRequest.java index bf73c1b2..e6ad347c 100644 --- a/src/main/java/com/fauna/feed/FeedRequest.java +++ b/src/main/java/com/fauna/feed/FeedRequest.java @@ -1,82 +1,53 @@ package com.fauna.feed; -import com.fauna.query.QueryOptions; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fauna.client.RequestBuilder; +import com.fauna.query.EventSourceResponse; -import java.time.Duration; -import java.util.Optional; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; public class FeedRequest { - private final String token; - private final String cursor; - private final Long startTs; - private final Integer pageSize; - private final Duration timeout; - - public FeedRequest(final String token, final String cursor, final Long startTs, final Integer pageSize, final Duration timeout) { - this.token = token; - this.cursor = cursor; - this.startTs = startTs; - this.pageSize = pageSize; - this.timeout = timeout; - } - - public String getToken() { - return token; - } - - public Optional getCursor() { - return Optional.ofNullable(cursor); - } - - public Optional getStartTs() { - return Optional.ofNullable(startTs); - } - - public Optional getPageSize() { - return Optional.ofNullable(pageSize); - } - - public Optional getTimeout() { - return Optional.ofNullable(timeout); - } - - public static class Builder { - public final String token; - public String cursor; - public Long startTs; - public Integer pageSize; - public Duration timeout = QueryOptions.DEFAULT_TIMEOUT; - - public Builder(final String token) { - this.token = token; - } - - public Builder cursor(final String cursor) { - this.cursor = cursor; - return this; + private final EventSource source; + private final FeedOptions options; + + public FeedRequest(EventSource source, FeedOptions options) { + this.source = source; + this.options = options; + if (source == null) { + throw new IllegalArgumentException("EventSource cannot be null."); } - - public Builder startTs(final Long startTs) { - this.startTs = startTs; - return this; + if (options == null) { + throw new IllegalArgumentException("FeedOptions cannot be null."); } + } - public Builder pageSize(final int pageSize) { - this.pageSize = pageSize; - return this; + public String serialize() throws IOException { + // Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example, + // start_ts is a JSON numeric/integer, not a tagged '@long'. + ByteArrayOutputStream requestBytes = new ByteArrayOutputStream(); + JsonGenerator gen = new JsonFactory().createGenerator(requestBytes); + gen.writeStartObject(); + gen.writeStringField(RequestBuilder.FieldNames.TOKEN, source.getToken()); + // Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException. + if (options.getCursor().isPresent()) { + gen.writeStringField(RequestBuilder.FieldNames.CURSOR, options.getCursor().get()); } - - public Builder timeout(final Duration timeout) { - this.timeout = timeout; - return this; + if (options.getStartTs().isPresent()) { + gen.writeNumberField(RequestBuilder.FieldNames.START_TS, options.getStartTs().get()); } - - public FeedRequest build() { - return new FeedRequest(token, cursor, startTs, pageSize, timeout); + if (options.getPageSize().isPresent()) { + gen.writeNumberField(RequestBuilder.FieldNames.PAGE_SIZE, options.getPageSize().get()); } + gen.writeEndObject(); + gen.flush(); + return requestBytes.toString(StandardCharsets.UTF_8); } - public static Builder builder(String token) { - return new Builder(token); + public static FeedRequest fromResponse(EventSourceResponse resp, FeedOptions options) { + return new FeedRequest(EventSource.fromToken(resp.getToken()), options); } + } diff --git a/src/main/java/com/fauna/query/StreamTokenResponse.java b/src/main/java/com/fauna/query/EventSourceResponse.java similarity index 74% rename from src/main/java/com/fauna/query/StreamTokenResponse.java rename to src/main/java/com/fauna/query/EventSourceResponse.java index cecf30ae..4605a4a4 100644 --- a/src/main/java/com/fauna/query/StreamTokenResponse.java +++ b/src/main/java/com/fauna/query/EventSourceResponse.java @@ -3,14 +3,14 @@ import java.util.Objects; -public class StreamTokenResponse { +public class EventSourceResponse { private String token; - public StreamTokenResponse(String token) { + public EventSourceResponse(String token) { this.token = token; } - public StreamTokenResponse() {} + public EventSourceResponse() {} public String getToken() { return this.token; @@ -24,7 +24,7 @@ public boolean equals(Object o) { if (getClass() != o.getClass()) return false; - StreamTokenResponse c = (StreamTokenResponse) o; + EventSourceResponse c = (EventSourceResponse) o; return Objects.equals(token, c.token); } diff --git a/src/main/java/com/fauna/stream/StreamRequest.java b/src/main/java/com/fauna/stream/StreamRequest.java index 5d3a7754..10c3e2c5 100644 --- a/src/main/java/com/fauna/stream/StreamRequest.java +++ b/src/main/java/com/fauna/stream/StreamRequest.java @@ -1,6 +1,6 @@ package com.fauna.stream; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; import java.time.Duration; import java.util.Optional; @@ -27,7 +27,7 @@ public class StreamRequest { } } - public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) { + public static StreamRequest fromTokenResponse(EventSourceResponse tokenResponse) { return new StreamRequest(tokenResponse.getToken(), null, null, null); } diff --git a/src/test/java/com/fauna/client/DefaultsTest.java b/src/test/java/com/fauna/client/DefaultsTest.java index 507832dd..dc51f884 100644 --- a/src/test/java/com/fauna/client/DefaultsTest.java +++ b/src/test/java/com/fauna/client/DefaultsTest.java @@ -1,7 +1,8 @@ package com.fauna.client; import com.fauna.codec.DefaultCodecProvider; -import com.fauna.feed.FeedRequest; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; import com.fauna.query.QueryOptions; import com.fauna.stream.StreamRequest; import org.junit.jupiter.api.Test; @@ -22,6 +23,7 @@ public class DefaultsTest { public static RequestBuilder queryRequestBuilder = client.getRequestBuilder(); public static RequestBuilder streamRequestBuilder = client.getStreamRequestBuilder(); public static RequestBuilder feedRequestBuilder = client.getFeedRequestBuilder(); + private static EventSource source = EventSource.fromToken("token"); @Test public void testClientDefaults() { @@ -57,7 +59,7 @@ public void testTimeoutDefaults() { assertEquals("5000", queryRequest.headers().firstValue(RequestBuilder.Headers.QUERY_TIMEOUT_MS).orElseThrow()); // Feeds follow the same defaults as queries. - HttpRequest feedRequest = feedRequestBuilder.buildFeedRequest(FeedRequest.builder("token").build()); + HttpRequest feedRequest = feedRequestBuilder.buildFeedRequest(source, FeedOptions.DEFAULT); assertEquals("5000", feedRequest.headers().firstValue(RequestBuilder.Headers.QUERY_TIMEOUT_MS).orElseThrow()); assertEquals(Duration.ofSeconds(10), feedRequest.timeout().orElseThrow()); @@ -76,7 +78,7 @@ public void testNullQueryTimeouts() { assertTrue(queryRequest.timeout().isEmpty()); assertTrue(queryRequest.headers().firstValue(RequestBuilder.Headers.QUERY_TIMEOUT_MS).isEmpty()); - HttpRequest feedRequest = feedRequestBuilder.buildFeedRequest(FeedRequest.builder("token").timeout(null).build()); + HttpRequest feedRequest = feedRequestBuilder.buildFeedRequest(source, FeedOptions.builder().timeout(null).build()); assertTrue(feedRequest.timeout().isEmpty()); assertTrue(feedRequest.headers().firstValue(RequestBuilder.Headers.QUERY_TIMEOUT_MS).isEmpty()); } @@ -90,7 +92,7 @@ public void testOverridingTimeouts() { @Test public void testFeedDefaults() { RequestBuilder builder = Fauna.client().getFeedRequestBuilder(); - HttpRequest req = builder.buildFeedRequest(FeedRequest.builder("token").build()); + HttpRequest req = builder.buildFeedRequest(source, FeedOptions.DEFAULT); assertEquals(Duration.ofSeconds(10), req.timeout().orElseThrow()); // Unlike query, the default timeout for feeds is not set. assertEquals("POST", req.method()); assertEquals("https://db.fauna.com/feed/1", req.uri().toString()); diff --git a/src/test/java/com/fauna/client/FeedIteratorTest.java b/src/test/java/com/fauna/client/FeedIteratorTest.java index 8be56048..5b5fbdb2 100644 --- a/src/test/java/com/fauna/client/FeedIteratorTest.java +++ b/src/test/java/com/fauna/client/FeedIteratorTest.java @@ -5,8 +5,9 @@ import com.fauna.codec.Codec; import com.fauna.codec.DefaultCodecProvider; import com.fauna.exception.InvalidRequestException; -import com.fauna.feed.FeedRequest; -import com.fauna.feed.FeedSuccess; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; +import com.fauna.feed.FeedPage; import com.fauna.response.ErrorInfo; import com.fauna.response.QueryFailure; import com.fauna.response.QueryResponse; @@ -30,18 +31,20 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class FeedIteratorTest { private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final EventSource source = EventSource.fromToken("token"); + private static final String CURSOR_0 = "cursor0"; @Mock private FaunaClient client; - private CompletableFuture> successFuture(boolean after, int num) throws IOException { + private CompletableFuture> successFuture(boolean after, int num) throws IOException { List> events = new ArrayList<>(); Codec codec = DefaultCodecProvider.SINGLETON.get(String.class); events.add(new StreamEvent<>(StreamEvent.EventType.ADD, @@ -51,10 +54,10 @@ private CompletableFuture> successFuture(boolean after, int "cursor0", System.currentTimeMillis() - 5, num + "-b", null, null)); - return CompletableFuture.supplyAsync(() -> FeedSuccess.builder(codec).events(events).cursor("cursor0").hasNext(after).build()); + return CompletableFuture.supplyAsync(() -> FeedPage.builder(codec).events(events).cursor("cursor0").hasNext(after).build()); } - private CompletableFuture> failureFuture() throws IOException { + private CompletableFuture> failureFuture() throws IOException { ObjectNode root = MAPPER.createObjectNode(); ObjectNode error = root.putObject("error"); error.put("code", "invalid_query"); @@ -66,9 +69,9 @@ private CompletableFuture> failureFuture() throws IOExceptio @Test public void test_single_page() throws IOException { - FeedRequest req = FeedRequest.builder("token").pageSize(8).build(); - when(client.asyncFeed(req, String.class)).thenReturn(successFuture(false, 0)); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + FeedOptions options = FeedOptions.builder().pageSize(8).build(); + when(client.feedPage(source, options, String.class)).thenReturn(successFuture(false, 0)); + FeedIterator feedIterator = new FeedIterator<>(client, source, options, String.class); assertTrue(feedIterator.hasNext()); assertEquals(List.of("0-a", "0-b"), feedIterator.next().getEvents().stream().map(e -> e.getData().get()).collect(Collectors.toList())); assertFalse(feedIterator.hasNext()); @@ -77,9 +80,8 @@ public void test_single_page() throws IOException { @Test public void test_single_page_without_calling_hasNext() throws IOException { - FeedRequest req = FeedRequest.builder("token").build(); - when(client.asyncFeed(req, String.class)).thenReturn(successFuture(false, 0)); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + when(client.feedPage(source, FeedOptions.DEFAULT, String.class)).thenReturn(successFuture(false, 0)); + FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); // No call to hasNext here. assertEquals(List.of("0-a", "0-b"), feedIterator.next().getEvents().stream().map(e -> e.getData().get()).collect(Collectors.toList())); assertFalse(feedIterator.hasNext()); @@ -88,9 +90,9 @@ public void test_single_page_without_calling_hasNext() throws IOException { @Test public void test_multiple_pages() throws IOException { - FeedRequest req = FeedRequest.builder("token").build(); - when(client.asyncFeed(any(FeedRequest.class), eq(String.class))).thenReturn(successFuture(true, 0), successFuture(false, 1)); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + when(client.feedPage(argThat(source::equals), argThat(FeedOptions.DEFAULT::equals), any(Class.class))).thenReturn(successFuture(true, 0)); + when(client.feedPage(argThat(source::equals), argThat(opts -> opts.getCursor().orElse("").equals(CURSOR_0)), any(Class.class))).thenReturn(successFuture(false, 1)); + FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); assertTrue(feedIterator.hasNext()); assertEquals(List.of("0-a", "0-b"), feedIterator.next().getEvents().stream().map(e -> e.getData().get()).collect(Collectors.toList())); @@ -103,9 +105,8 @@ public void test_multiple_pages() throws IOException { @Test public void test_multiple_pages_async() throws IOException, ExecutionException, InterruptedException { - FeedRequest req = FeedRequest.builder("token").build(); - when(client.asyncFeed(any(FeedRequest.class), eq(String.class))).thenReturn(successFuture(true, 0), successFuture(false, 1)); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + when(client.feedPage(argThat(source::equals), any(), any(Class.class))).thenReturn(successFuture(true, 0), successFuture(false, 1)); + FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); boolean hasNext = feedIterator.hasNext(); List products = new ArrayList<>(); @@ -119,9 +120,9 @@ public void test_multiple_pages_async() throws IOException, ExecutionException, @Test public void test_flatten() throws IOException { - when(client.asyncFeed(any(FeedRequest.class), eq(String.class))).thenReturn(successFuture(true, 0), successFuture(false, 1)); - FeedRequest req = FeedRequest.builder("token").build(); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + when(client.feedPage(argThat(source::equals), argThat(FeedOptions.DEFAULT::equals), any(Class.class))).thenReturn(successFuture(true, 0)); + when(client.feedPage(argThat(source::equals), argThat(opts -> opts.getCursor().orElse("").equals(CURSOR_0)), any(Class.class))).thenReturn(successFuture(false, 1)); + FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); Iterator> iter = feedIterator.flatten(); List products = new ArrayList<>(); iter.forEachRemaining(event -> products.add(event.getData().orElseThrow())); @@ -131,9 +132,8 @@ public void test_flatten() throws IOException { @Test public void test_error_thrown() throws IOException { - FeedRequest req = FeedRequest.builder("token").build(); - when(client.asyncFeed(req, String.class)).thenReturn(failureFuture()); - FeedIterator feedIterator = new FeedIterator<>(client, req, String.class); + when(client.feedPage(source, FeedOptions.DEFAULT, String.class)).thenReturn(failureFuture()); + FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); // We could return the wrapped completion exception here. assertTrue(feedIterator.hasNext()); diff --git a/src/test/java/com/fauna/codec/codecs/DynamicCodecTest.java b/src/test/java/com/fauna/codec/codecs/DynamicCodecTest.java index f5c7e3ad..022c651b 100644 --- a/src/test/java/com/fauna/codec/codecs/DynamicCodecTest.java +++ b/src/test/java/com/fauna/codec/codecs/DynamicCodecTest.java @@ -4,10 +4,8 @@ import com.fauna.codec.Codec; import com.fauna.codec.DefaultCodecProvider; import com.fauna.codec.FaunaType; -import com.fauna.codec.Helpers; -import com.fauna.exception.ClientException; import com.fauna.exception.NullDocumentException; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; import com.fauna.types.Module; import com.fauna.types.*; import org.junit.jupiter.api.Test; @@ -16,7 +14,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; -import java.text.MessageFormat; import java.time.Instant; import java.util.Arrays; import java.util.Map; @@ -67,7 +64,7 @@ public static Stream testCases() { Arguments.of(TestType.Decode, DYNAMIC_CODEC, NAMED_DOCUMENT_WIRE, NAMED_DOCUMENT, null), Arguments.of(TestType.Decode, DYNAMIC_CODEC, NAMED_DOCUMENT_REF_WIRE, NAMED_DOCUMENT_REF, null), Arguments.of(TestType.Decode, DYNAMIC_CODEC, NULL_DOC_WIRE, null, NULL_DOC_EXCEPTION), - Arguments.of(TestType.Decode, DYNAMIC_CODEC, "{\"@stream\":\"token\"}", new StreamTokenResponse("token"), null), + Arguments.of(TestType.Decode, DYNAMIC_CODEC, "{\"@stream\":\"token\"}", new EventSourceResponse("token"), null), Arguments.of(TestType.Decode, DYNAMIC_CODEC, "{\"@bytes\": \"RmF1bmE=\"}", new byte[]{70, 97, 117, 110, 97}, null) ); } diff --git a/src/test/java/com/fauna/e2e/E2EFeedsTest.java b/src/test/java/com/fauna/e2e/E2EFeedsTest.java index f499e68e..8f1c69ef 100644 --- a/src/test/java/com/fauna/e2e/E2EFeedsTest.java +++ b/src/test/java/com/fauna/e2e/E2EFeedsTest.java @@ -5,7 +5,11 @@ import com.fauna.client.FaunaConfig; import com.fauna.client.FeedIterator; import com.fauna.e2e.beans.Product; -import com.fauna.feed.FeedSuccess; +import com.fauna.feed.EventSource; +import com.fauna.feed.FeedOptions; +import com.fauna.feed.FeedPage; +import com.fauna.query.EventSourceResponse; +import com.fauna.response.QuerySuccess; import com.fauna.response.StreamEvent; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -13,6 +17,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; import java.util.logging.Level; @@ -41,7 +46,8 @@ public static void setup() { @Test public void feedOfAll() { - FeedIterator iter = client.feed(fql("Product.all().eventSource()"), productCollectionTs, Product.class); + FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build(); + FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); List>> pages = new ArrayList<>(); iter.forEachRemaining(page -> pages.add(page.getEvents())); assertEquals(4, pages.size()); @@ -49,13 +55,49 @@ public void feedOfAll() { assertEquals(50, products.size()); } + @Test + public void manualFeed() { + // Use the feeds API with complete (i.e. manual) control of the calls made to Fauna. + QuerySuccess sourceQuery = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class); + EventSource source = EventSource.fromResponse(sourceQuery.getData()); + List> productUpdates = new ArrayList<>(); + FeedOptions initialOptions = FeedOptions.builder().startTs(productCollectionTs).pageSize(2).build(); + CompletableFuture> pageFuture = client.feedPage(source, initialOptions, Product.class); + int pageCount = 0; + String lastPageCursor = null; + + while(pageFuture != null) { + // Handle page + FeedPage latestPage = pageFuture.join(); + lastPageCursor = latestPage.getCursor(); + productUpdates.addAll(latestPage.getEvents()); + pageCount++; + + // Get next page (if it's not null) + FeedOptions nextPageOptions = initialOptions.nextPage(latestPage); + // You can also inspect next + if (latestPage.hasNext()) { + pageFuture = client.feedPage(source, nextPageOptions, Product.class); + } else { + pageFuture = null; + } + } + assertEquals(50, productUpdates.size()); + assertEquals(25, pageCount); + // Because there is no filtering, these cursors are the same. + // If we filtered events, then the page cursor could be different from the cursor of the last element. + assertEquals(lastPageCursor, productUpdates.get(productUpdates.size()-1).getCursor()); + + } + @Test public void feedError() { - FeedIterator iter = client.feed(fql("Product.all().eventSource()"), 0L, Product.class); - FeedSuccess pageOne = iter.next(); + FeedOptions options = FeedOptions.builder().startTs(0L).build(); + FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); + FeedPage pageOne = iter.next(); assertFalse(pageOne.hasNext()); assertEquals(1, pageOne.getEvents().size()); - StreamEvent errorEvent = pageOne.getEvents().get(0); + StreamEvent errorEvent = pageOne.getEvents().get(0); assertEquals(ERROR, errorEvent.getType()); assertEquals("invalid_stream_start_time", errorEvent.getError().getCode()); assertTrue(errorEvent.getError().getMessage().contains("is too far in the past")); @@ -63,7 +105,8 @@ public void feedError() { @Test public void feedFlattened() { - FeedIterator iter = client.feed(fql("Product.all().eventSource()"), productCollectionTs, Product.class); + FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build(); + FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); Iterator> productIter = iter.flatten(); List> products = new ArrayList<>(); // Java iterators not being iterable (or useable in a for-each loop) is annoying. diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 2ffd4ac3..63429219 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -5,7 +5,7 @@ import com.fauna.client.FaunaStream; import com.fauna.e2e.beans.Product; import com.fauna.exception.ClientException; -import com.fauna.query.StreamTokenResponse; +import com.fauna.query.EventSourceResponse; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; import com.fauna.response.StreamEvent; @@ -144,7 +144,7 @@ public void query_streamOfProduct() throws InterruptedException { public void handleStreamError() throws InterruptedException { // It would be nice to have another test that generates a stream with normal events, and then an error // event, but this at least tests some of the functionality. - QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); + QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), EventSourceResponse.class); StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build(); FaunaStream stream = client.stream(request, Product.class); InventorySubscriber inventory = new InventorySubscriber(); @@ -158,7 +158,7 @@ public void handleStreamError() throws InterruptedException { @Test public void handleStreamTimeout() { - QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); + QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), EventSourceResponse.class); StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build(); ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class)); assertEquals(ExecutionException.class, exc.getCause().getClass());