Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BT-5271-streams-api] Align the Streams API with Event Feeds terminology. #164

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 30 additions & 35 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
import com.fauna.codec.DefaultCodecRegistry;
import com.fauna.event.FaunaStream;
import com.fauna.event.FeedIterator;
import com.fauna.event.StreamOptions;
import com.fauna.exception.ClientException;
import com.fauna.exception.ErrorHandler;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ProtocolException;
import com.fauna.exception.ServiceException;
import com.fauna.event.EventSource;
import com.fauna.event.FeedOptions;
import com.fauna.event.FeedPage;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.response.QueryFailure;
import com.fauna.event.StreamRequest;
import com.fauna.event.EventSourceResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QueryResponse;
Expand Down Expand Up @@ -143,16 +140,7 @@ private <E> Supplier<CompletableFuture<FeedPage<E>>> makeAsyncFeedRequest(HttpCl
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(
response -> {
logResponse(response);
if (response.statusCode() >= 400) {
// There are possibly some different error cases to handle for feeds. This seems like
// a comprehensive solution for now. In the future we could rename QueryFailure et. al. to
// something like FaunaFailure, or implement "FeedFailure".
QueryFailure failure = new QueryFailure(response.statusCode(), QueryResponse.builder(codec));
ErrorHandler.handleQueryFailure(response.statusCode(), failure);
// Fall back on ProtocolException.
throw new ProtocolException(response.statusCode(), failure);
}
return FeedPage.parseResponse(response, codec);
return FeedPage.parseResponse(response, codec, statsCollector);
pnwpedro marked this conversation as resolved.
Show resolved Hide resolved
}).whenComplete(this::completeFeedRequest);
}

Expand Down Expand Up @@ -437,13 +425,14 @@ public <E> PageIterator<E> paginate(Query fql, Class<E> elementClass) {
* Send a request to the Fauna stream endpoint, and return a CompletableFuture that completes with the FaunaStream
* publisher.
*
* @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp.
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @param eventSource The Event Source (e.g. token from `.eventSource()`).
* @param streamOptions The Stream Options (including start timestamp, retry strategy).
* @return CompletableFuture A CompletableFuture of FaunaStream<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamRequest, Class<E> elementClass) {
HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(streamRequest);
public <E> CompletableFuture<FaunaStream<E>> asyncStream(EventSource eventSource,
StreamOptions streamOptions, Class<E> elementClass) {
HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(eventSource, streamOptions);
return getHttpClient().sendAsync(streamReq,
HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
CompletableFuture<FaunaStream<E>> publisher = new CompletableFuture<>();
Expand All @@ -456,28 +445,33 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamReq

/**
* Send a request to the Fauna stream endpoint to start a stream, and return a FaunaStream publisher.
* @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp.
* @param elementClass The expected class &lt;E&gt; of the stream events.
*
* @param eventSource The request object including a stream token, and optionally a cursor, or timestamp.
* @param streamOptions
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @return FaunaStream A publisher, implementing Flow.Publisher&lt;StreamEvent&lt;E&gt;&gt; from the Java Flow API.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(StreamRequest streamRequest, Class<E> elementClass) {
return completeAsync(asyncStream(streamRequest, elementClass), STREAM_SUBSCRIPTION);
public <E> FaunaStream<E> stream(EventSource eventSource, StreamOptions streamOptions, Class<E> elementClass) {
return completeAsync(asyncStream(eventSource, streamOptions, elementClass), STREAM_SUBSCRIPTION);
}

/**
* Start a Fauna stream based on an FQL query, and return a CompletableFuture of the resulting FaunaStream
* publisher. This method sends two requests, one to the query endpoint to get the stream token, and then another
* to the stream endpoint. This method is equivalent to calling the query, then the stream methods on FaunaClient.
*
* @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`.
* This method does not take QueryOptions, or StreamOptions as parameters. If you need specify either
* query, or stream options; you can use the asyncQuery/asyncStream methods.
*
* @param fql The FQL query to be executed. It must return an event source, e.g. ends in `.eventSource()`.
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @return FaunaStream A publisher, implementing Flow.Publisher&lt;StreamEvent&lt;E&gt;&gt; from the Java Flow API.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> elementClass) {
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(
queryResponse -> this.stream(StreamRequest.fromTokenResponse(queryResponse.getData()), elementClass));
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(queryResponse ->
this.stream(EventSource.fromResponse(queryResponse.getData()), StreamOptions.builder().build(), elementClass));
}

/**
Expand All @@ -486,9 +480,10 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> ele
* the stream token, and then another request to the stream endpoint which return the FaunaStream publisher.
*
* <p>
* Query = fql("Product.all().toStream()");
* QuerySuccess&lt;StreamTokenResponse&gt; tokenResp = client.query(fql, StreamTokenResponse.class);
* FaunaStream&lt;Product&gt; faunaStream = client.stream(new StreamRequest(tokenResp.getData.getToken(), Product.class)
* Query = fql("Product.all().eventSource()");
* QuerySuccess&lt;EventSourceResponse&gt; querySuccess = client.query(fql, EventSourceResponse.class);
* EventSource source = EventSource.fromResponse(querySuccess.getData());
* FaunaStream&lt;Product&gt; faunaStream = client.stream(source, StreamOptions.DEFAULT, Product.class)
*
* @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`.
* @param elementClass The expected class &lt;E&gt; of the stream events.
Expand All @@ -504,15 +499,15 @@ public <E> FaunaStream<E> stream(Query fql, Class<E> elementClass) {

/**
* Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the feed page.
* @param source An EventSource object with the feed token.
* @param feedOptions The FeedOptions object (default options will be used if null).
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedSuccess A CompletableFuture that completes with the successful feed response.
* @param <E> The type of the feed events.
* @param eventSource An EventSource object (e.g. token from `.eventSource()`)
* @param feedOptions The FeedOptions object (default options will be used if null).
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return CompletableFuture A CompletableFuture that completes with a FeedPage&lt;E&gt;.
* @param <E> The type of the feed events.
*/
public <E> CompletableFuture<FeedPage<E>> poll(EventSource source, FeedOptions feedOptions, Class<E> elementClass) {
public <E> CompletableFuture<FeedPage<E>> poll(EventSource eventSource, FeedOptions feedOptions, Class<E> elementClass) {
return new RetryHandler<FeedPage<E>>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest(
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(source, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass)));
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(eventSource, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass)));
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/fauna/client/PageIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public PageIterator(FaunaClient client, Query fql, Class<E> resultClass, QueryOp
this.queryFuture = client.asyncQuery(fql, this.pageClass, options);
}

public PageIterator(FaunaClient client, Page<E> firstPage, Class<E> resultClass, QueryOptions options) {
this.client = client;
this.pageClass = new PageOf<>(resultClass);
this.options = options;
firstPage.getAfter().ifPresentOrElse(this::doPaginatedQuery, this::endPagination);
}

@Override
public boolean hasNext() {
return this.queryFuture != null;
Expand Down
38 changes: 5 additions & 33 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package com.fauna.client;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fauna.codec.Codec;
import com.fauna.codec.CodecProvider;
import com.fauna.env.DriverEnvironment;
import com.fauna.event.StreamOptions;
import com.fauna.event.StreamRequest;
import com.fauna.exception.ClientException;
import com.fauna.event.EventSource;
import com.fauna.event.FeedOptions;
import com.fauna.event.FeedRequest;
import com.fauna.query.QueryOptions;
import com.fauna.event.StreamRequest;
import com.fauna.query.builder.Query;
import com.fauna.codec.UTF8FaunaGenerator;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.logging.Logger;
Expand Down Expand Up @@ -133,36 +130,11 @@ public HttpRequest buildRequest(Query fql, QueryOptions options, CodecProvider p
}
}

/**
* Builds and returns the request body for the Fauna Streams API.
* @param request An Object representing the Stream request.
* @return
* @throws IOException
*/
public String buildStreamRequestBody(StreamRequest request) throws IOException {
// Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example,
// start_ts is a JSON numeric/integer, not a tagged '@long'.
ByteArrayOutputStream requestBytes = new ByteArrayOutputStream();
JsonGenerator gen = new JsonFactory().createGenerator(requestBytes);
gen.writeStartObject();
gen.writeStringField(FieldNames.TOKEN, request.getToken());
// Only one of cursor / start_ts can be present, prefer cursor.
// Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException.
if (request.getCursor().isPresent()) {
gen.writeStringField(FieldNames.CURSOR, request.getCursor().get());
} else if (request.getStartTs().isPresent()) {
gen.writeNumberField(FieldNames.START_TS, request.getStartTs().get());
}
gen.writeEndObject();
gen.flush();
return requestBytes.toString(StandardCharsets.UTF_8);
}

public HttpRequest buildStreamRequest(StreamRequest request) {
public HttpRequest buildStreamRequest(EventSource eventSource, StreamOptions streamOptions) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
streamOptions.getTimeout().ifPresent(builder::timeout);
try {
String body = buildStreamRequestBody(request);
String body = new StreamRequest(eventSource, streamOptions).serialize();
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
logRequest(body, req);
return req;
Expand Down
40 changes: 27 additions & 13 deletions src/main/java/com/fauna/event/FeedPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fauna.client.StatsCollector;
import com.fauna.codec.Codec;
import com.fauna.exception.ClientResponseException;
import com.fauna.response.QueryResponse;
import com.fauna.response.QueryStats;

import java.io.IOException;
Expand Down Expand Up @@ -62,13 +64,15 @@ public QueryStats getStats() {

public static class Builder<E> {
private final Codec<E> elementCodec;
private final StatsCollector statsCollector;
public List<FaunaEvent<E>> events;
public String cursor = "";
public Boolean hasNext = false;
public QueryStats stats = null;

public Builder(Codec<E> elementCodec) {
public Builder(Codec<E> elementCodec, StatsCollector statsCollector) {
this.elementCodec = elementCodec;
this.statsCollector = statsCollector;
}

public Builder events(List<FaunaEvent<E>> events) {
Expand Down Expand Up @@ -116,32 +120,42 @@ public Builder parseField(JsonParser parser) throws IOException {
case EVENTS_FIELD_NAME:
return parseEvents(parser);
case STATS_FIELD_NAME:
return stats(QueryStats.parseStats(parser));
QueryStats stats = QueryStats.parseStats(parser);
statsCollector.add(stats);
return stats(stats);
case FEED_HAS_NEXT_FIELD_NAME:
return hasNext(parser.nextBooleanValue());
default:
throw new ClientResponseException("Unknown StreamEvent field: " + fieldName);
throw new ClientResponseException("Unknown FeedPage field: " + fieldName);
}
}

}

public static <E> Builder<E> builder(Codec<E> elementCodec) {
return new Builder<>(elementCodec);
public static <E> Builder<E> builder(Codec<E> elementCodec, StatsCollector statsCollector) {
return new Builder<>(elementCodec, statsCollector);
}

public static <E> FeedPage<E> parseResponse(HttpResponse<InputStream> response, Codec<E> elementCodec) {
public static <E> FeedPage<E> parseResponse(HttpResponse<InputStream> response, Codec<E> elementCodec, StatsCollector statsCollector) {
try {
// If you want to inspect the request body before it's parsed, you can uncomment this.
// String body = new BufferedReader(new InputStreamReader(response.body()))
// .lines().collect(Collectors.joining("\n"));
if (response.statusCode() >= 400) {
// It's not ideal to use the QueryResponse parser in the Feed API. But for error cases, the
// error response that the Feed API throws is a terser (i.e. subset) version of QueryFailure, and the
// parser gracefully handles it. A FaunaException will be thrown by parseResponse.
QueryResponse.parseResponse(response, elementCodec, statsCollector);
}
JsonParser parser = JSON_FACTORY.createParser(response.body());
if (parser.nextToken() == START_OBJECT) {
Builder<E> builder = FeedPage.builder(elementCodec);
while (parser.nextToken() == FIELD_NAME) {
builder.parseField(parser);
}
return builder.build();
} else {
if (parser.nextToken() != START_OBJECT) {
throw new ClientResponseException("Invalid event starting with: " + parser.currentToken());
}
Builder<E> builder = FeedPage.builder(elementCodec, statsCollector);
while (parser.nextToken() == FIELD_NAME) {
builder.parseField(parser);
}
return builder.build();
} catch (IOException e) {
throw new ClientResponseException("Error parsing Feed response.", e);
}
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/com/fauna/event/StreamOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

import com.fauna.client.RetryStrategy;

import java.time.Duration;
import java.util.Optional;

public class StreamOptions {

private final String cursor;
private final RetryStrategy retryStrategy;
private final Long startTimestamp;
private final Boolean statusEvents;
private final Duration timeout;

public static final StreamOptions DEFAULT = StreamOptions.builder().build();

public StreamOptions(Builder builder) {
this.cursor = builder.cursor;
this.retryStrategy = builder.retryStrategy;
this.startTimestamp = builder.startTimestamp;
this.statusEvents = builder.statusEvents;
this.timeout = builder.timeout;
}

public Optional<String> getCursor() {
return Optional.ofNullable(cursor);
}

public Optional<RetryStrategy> getRetryStrategy() {
Expand All @@ -29,27 +40,43 @@ public Optional<Boolean> getStatusEvents() {
return Optional.ofNullable(statusEvents);
}

public Optional<Duration> getTimeout() {
return Optional.ofNullable(timeout);
}


public static class Builder {
public String cursor = null;
public RetryStrategy retryStrategy = null;
public Long startTimestamp = null;
public Boolean statusEvents = null;
public Duration timeout = null;

public Builder withRetryStrategy(RetryStrategy retryStrategy) {
public Builder cursor(String cursor) {
this.cursor = cursor;
return this;
}

public Builder retryStrategy(RetryStrategy retryStrategy) {
this.retryStrategy = retryStrategy;
return this;
}

public Builder withStartTimestamp(long startTimestamp) {
public Builder startTimestamp(long startTimestamp) {
this.startTimestamp = startTimestamp;
return this;
}

public Builder withStatusEvents(Boolean statusEvents) {
public Builder statusEvents(Boolean statusEvents) {
this.statusEvents = statusEvents;
return this;
}

public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public StreamOptions build() {
return new StreamOptions(this);
}
Expand Down
Loading
Loading