Skip to content

Commit

Permalink
Merge pull request #164 from fauna/BT-5271-streams-api
Browse files Browse the repository at this point in the history
[BT-5271-streams-api] Align the Streams API with Event Feeds terminology.
  • Loading branch information
pnwpedro authored Nov 6, 2024
2 parents adf0495 + 3cb6d20 commit eccb34d
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 297 deletions.
65 changes: 30 additions & 35 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
import com.fauna.codec.DefaultCodecRegistry;
import com.fauna.event.FaunaStream;
import com.fauna.event.FeedIterator;
import com.fauna.event.StreamOptions;
import com.fauna.exception.ClientException;
import com.fauna.exception.ErrorHandler;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ProtocolException;
import com.fauna.exception.ServiceException;
import com.fauna.event.EventSource;
import com.fauna.event.FeedOptions;
import com.fauna.event.FeedPage;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.response.QueryFailure;
import com.fauna.event.StreamRequest;
import com.fauna.event.EventSourceResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QueryResponse;
Expand Down Expand Up @@ -143,16 +140,7 @@ private <E> Supplier<CompletableFuture<FeedPage<E>>> makeAsyncFeedRequest(HttpCl
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(
response -> {
logResponse(response);
if (response.statusCode() >= 400) {
// There are possibly some different error cases to handle for feeds. This seems like
// a comprehensive solution for now. In the future we could rename QueryFailure et. al. to
// something like FaunaFailure, or implement "FeedFailure".
QueryFailure failure = new QueryFailure(response.statusCode(), QueryResponse.builder(codec));
ErrorHandler.handleQueryFailure(response.statusCode(), failure);
// Fall back on ProtocolException.
throw new ProtocolException(response.statusCode(), failure);
}
return FeedPage.parseResponse(response, codec);
return FeedPage.parseResponse(response, codec, statsCollector);
}).whenComplete(this::completeFeedRequest);
}

Expand Down Expand Up @@ -437,13 +425,14 @@ public <E> PageIterator<E> paginate(Query fql, Class<E> elementClass) {
* Send a request to the Fauna stream endpoint, and return a CompletableFuture that completes with the FaunaStream
* publisher.
*
* @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp.
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @param eventSource The Event Source (e.g. token from `.eventSource()`).
* @param streamOptions The Stream Options (including start timestamp, retry strategy).
* @return CompletableFuture A CompletableFuture of FaunaStream<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamRequest, Class<E> elementClass) {
HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(streamRequest);
public <E> CompletableFuture<FaunaStream<E>> asyncStream(EventSource eventSource,
StreamOptions streamOptions, Class<E> elementClass) {
HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(eventSource, streamOptions);
return getHttpClient().sendAsync(streamReq,
HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
CompletableFuture<FaunaStream<E>> publisher = new CompletableFuture<>();
Expand All @@ -456,28 +445,33 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamReq

/**
* Send a request to the Fauna stream endpoint to start a stream, and return a FaunaStream publisher.
* @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp.
* @param elementClass The expected class &lt;E&gt; of the stream events.
*
* @param eventSource The request object including a stream token, and optionally a cursor, or timestamp.
* @param streamOptions
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @return FaunaStream A publisher, implementing Flow.Publisher&lt;StreamEvent&lt;E&gt;&gt; from the Java Flow API.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(StreamRequest streamRequest, Class<E> elementClass) {
return completeAsync(asyncStream(streamRequest, elementClass), STREAM_SUBSCRIPTION);
public <E> FaunaStream<E> stream(EventSource eventSource, StreamOptions streamOptions, Class<E> elementClass) {
return completeAsync(asyncStream(eventSource, streamOptions, elementClass), STREAM_SUBSCRIPTION);
}

/**
* Start a Fauna stream based on an FQL query, and return a CompletableFuture of the resulting FaunaStream
* publisher. This method sends two requests, one to the query endpoint to get the stream token, and then another
* to the stream endpoint. This method is equivalent to calling the query, then the stream methods on FaunaClient.
*
* @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`.
* This method does not take QueryOptions, or StreamOptions as parameters. If you need specify either
* query, or stream options; you can use the asyncQuery/asyncStream methods.
*
* @param fql The FQL query to be executed. It must return an event source, e.g. ends in `.eventSource()`.
* @param elementClass The expected class &lt;E&gt; of the stream events.
* @return FaunaStream A publisher, implementing Flow.Publisher&lt;StreamEvent&lt;E&gt;&gt; from the Java Flow API.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> elementClass) {
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(
queryResponse -> this.stream(StreamRequest.fromTokenResponse(queryResponse.getData()), elementClass));
return this.asyncQuery(fql, EventSourceResponse.class).thenApply(queryResponse ->
this.stream(EventSource.fromResponse(queryResponse.getData()), StreamOptions.builder().build(), elementClass));
}

/**
Expand All @@ -486,9 +480,10 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> ele
* the stream token, and then another request to the stream endpoint which return the FaunaStream publisher.
*
* <p>
* Query = fql("Product.all().toStream()");
* QuerySuccess&lt;StreamTokenResponse&gt; tokenResp = client.query(fql, StreamTokenResponse.class);
* FaunaStream&lt;Product&gt; faunaStream = client.stream(new StreamRequest(tokenResp.getData.getToken(), Product.class)
* Query = fql("Product.all().eventSource()");
* QuerySuccess&lt;EventSourceResponse&gt; querySuccess = client.query(fql, EventSourceResponse.class);
* EventSource source = EventSource.fromResponse(querySuccess.getData());
* FaunaStream&lt;Product&gt; faunaStream = client.stream(source, StreamOptions.DEFAULT, Product.class)
*
* @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`.
* @param elementClass The expected class &lt;E&gt; of the stream events.
Expand All @@ -504,15 +499,15 @@ public <E> FaunaStream<E> stream(Query fql, Class<E> elementClass) {

/**
* Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the feed page.
* @param source An EventSource object with the feed token.
* @param feedOptions The FeedOptions object (default options will be used if null).
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return FeedSuccess A CompletableFuture that completes with the successful feed response.
* @param <E> The type of the feed events.
* @param eventSource An EventSource object (e.g. token from `.eventSource()`)
* @param feedOptions The FeedOptions object (default options will be used if null).
* @param elementClass The expected class &lt;E&gt; of the feed events.
* @return CompletableFuture A CompletableFuture that completes with a FeedPage&lt;E&gt;.
* @param <E> The type of the feed events.
*/
public <E> CompletableFuture<FeedPage<E>> poll(EventSource source, FeedOptions feedOptions, Class<E> elementClass) {
public <E> CompletableFuture<FeedPage<E>> poll(EventSource eventSource, FeedOptions feedOptions, Class<E> elementClass) {
return new RetryHandler<FeedPage<E>>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest(
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(source, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass)));
getHttpClient(), getFeedRequestBuilder().buildFeedRequest(eventSource, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass)));
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/fauna/client/PageIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public PageIterator(FaunaClient client, Query fql, Class<E> resultClass, QueryOp
this.queryFuture = client.asyncQuery(fql, this.pageClass, options);
}

public PageIterator(FaunaClient client, Page<E> firstPage, Class<E> resultClass, QueryOptions options) {
this.client = client;
this.pageClass = new PageOf<>(resultClass);
this.options = options;
firstPage.getAfter().ifPresentOrElse(this::doPaginatedQuery, this::endPagination);
}

@Override
public boolean hasNext() {
return this.queryFuture != null;
Expand Down
38 changes: 5 additions & 33 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package com.fauna.client;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fauna.codec.Codec;
import com.fauna.codec.CodecProvider;
import com.fauna.env.DriverEnvironment;
import com.fauna.event.StreamOptions;
import com.fauna.event.StreamRequest;
import com.fauna.exception.ClientException;
import com.fauna.event.EventSource;
import com.fauna.event.FeedOptions;
import com.fauna.event.FeedRequest;
import com.fauna.query.QueryOptions;
import com.fauna.event.StreamRequest;
import com.fauna.query.builder.Query;
import com.fauna.codec.UTF8FaunaGenerator;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.logging.Logger;
Expand Down Expand Up @@ -133,36 +130,11 @@ public HttpRequest buildRequest(Query fql, QueryOptions options, CodecProvider p
}
}

/**
* Builds and returns the request body for the Fauna Streams API.
* @param request An Object representing the Stream request.
* @return
* @throws IOException
*/
public String buildStreamRequestBody(StreamRequest request) throws IOException {
// Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example,
// start_ts is a JSON numeric/integer, not a tagged '@long'.
ByteArrayOutputStream requestBytes = new ByteArrayOutputStream();
JsonGenerator gen = new JsonFactory().createGenerator(requestBytes);
gen.writeStartObject();
gen.writeStringField(FieldNames.TOKEN, request.getToken());
// Only one of cursor / start_ts can be present, prefer cursor.
// Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException.
if (request.getCursor().isPresent()) {
gen.writeStringField(FieldNames.CURSOR, request.getCursor().get());
} else if (request.getStartTs().isPresent()) {
gen.writeNumberField(FieldNames.START_TS, request.getStartTs().get());
}
gen.writeEndObject();
gen.flush();
return requestBytes.toString(StandardCharsets.UTF_8);
}

public HttpRequest buildStreamRequest(StreamRequest request) {
public HttpRequest buildStreamRequest(EventSource eventSource, StreamOptions streamOptions) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
streamOptions.getTimeout().ifPresent(builder::timeout);
try {
String body = buildStreamRequestBody(request);
String body = new StreamRequest(eventSource, streamOptions).serialize();
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
logRequest(body, req);
return req;
Expand Down
40 changes: 27 additions & 13 deletions src/main/java/com/fauna/event/FeedPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fauna.client.StatsCollector;
import com.fauna.codec.Codec;
import com.fauna.exception.ClientResponseException;
import com.fauna.response.QueryResponse;
import com.fauna.response.QueryStats;

import java.io.IOException;
Expand Down Expand Up @@ -62,13 +64,15 @@ public QueryStats getStats() {

public static class Builder<E> {
private final Codec<E> elementCodec;
private final StatsCollector statsCollector;
public List<FaunaEvent<E>> events;
public String cursor = "";
public Boolean hasNext = false;
public QueryStats stats = null;

public Builder(Codec<E> elementCodec) {
public Builder(Codec<E> elementCodec, StatsCollector statsCollector) {
this.elementCodec = elementCodec;
this.statsCollector = statsCollector;
}

public Builder events(List<FaunaEvent<E>> events) {
Expand Down Expand Up @@ -116,32 +120,42 @@ public Builder parseField(JsonParser parser) throws IOException {
case EVENTS_FIELD_NAME:
return parseEvents(parser);
case STATS_FIELD_NAME:
return stats(QueryStats.parseStats(parser));
QueryStats stats = QueryStats.parseStats(parser);
statsCollector.add(stats);
return stats(stats);
case FEED_HAS_NEXT_FIELD_NAME:
return hasNext(parser.nextBooleanValue());
default:
throw new ClientResponseException("Unknown StreamEvent field: " + fieldName);
throw new ClientResponseException("Unknown FeedPage field: " + fieldName);
}
}

}

public static <E> Builder<E> builder(Codec<E> elementCodec) {
return new Builder<>(elementCodec);
public static <E> Builder<E> builder(Codec<E> elementCodec, StatsCollector statsCollector) {
return new Builder<>(elementCodec, statsCollector);
}

public static <E> FeedPage<E> parseResponse(HttpResponse<InputStream> response, Codec<E> elementCodec) {
public static <E> FeedPage<E> parseResponse(HttpResponse<InputStream> response, Codec<E> elementCodec, StatsCollector statsCollector) {
try {
// If you want to inspect the request body before it's parsed, you can uncomment this.
// String body = new BufferedReader(new InputStreamReader(response.body()))
// .lines().collect(Collectors.joining("\n"));
if (response.statusCode() >= 400) {
// It's not ideal to use the QueryResponse parser in the Feed API. But for error cases, the
// error response that the Feed API throws is a terser (i.e. subset) version of QueryFailure, and the
// parser gracefully handles it. A FaunaException will be thrown by parseResponse.
QueryResponse.parseResponse(response, elementCodec, statsCollector);
}
JsonParser parser = JSON_FACTORY.createParser(response.body());
if (parser.nextToken() == START_OBJECT) {
Builder<E> builder = FeedPage.builder(elementCodec);
while (parser.nextToken() == FIELD_NAME) {
builder.parseField(parser);
}
return builder.build();
} else {
if (parser.nextToken() != START_OBJECT) {
throw new ClientResponseException("Invalid event starting with: " + parser.currentToken());
}
Builder<E> builder = FeedPage.builder(elementCodec, statsCollector);
while (parser.nextToken() == FIELD_NAME) {
builder.parseField(parser);
}
return builder.build();
} catch (IOException e) {
throw new ClientResponseException("Error parsing Feed response.", e);
}
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/com/fauna/event/StreamOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

import com.fauna.client.RetryStrategy;

import java.time.Duration;
import java.util.Optional;

public class StreamOptions {

private final String cursor;
private final RetryStrategy retryStrategy;
private final Long startTimestamp;
private final Boolean statusEvents;
private final Duration timeout;

public static final StreamOptions DEFAULT = StreamOptions.builder().build();

public StreamOptions(Builder builder) {
this.cursor = builder.cursor;
this.retryStrategy = builder.retryStrategy;
this.startTimestamp = builder.startTimestamp;
this.statusEvents = builder.statusEvents;
this.timeout = builder.timeout;
}

public Optional<String> getCursor() {
return Optional.ofNullable(cursor);
}

public Optional<RetryStrategy> getRetryStrategy() {
Expand All @@ -29,27 +40,43 @@ public Optional<Boolean> getStatusEvents() {
return Optional.ofNullable(statusEvents);
}

public Optional<Duration> getTimeout() {
return Optional.ofNullable(timeout);
}


public static class Builder {
public String cursor = null;
public RetryStrategy retryStrategy = null;
public Long startTimestamp = null;
public Boolean statusEvents = null;
public Duration timeout = null;

public Builder withRetryStrategy(RetryStrategy retryStrategy) {
public Builder cursor(String cursor) {
this.cursor = cursor;
return this;
}

public Builder retryStrategy(RetryStrategy retryStrategy) {
this.retryStrategy = retryStrategy;
return this;
}

public Builder withStartTimestamp(long startTimestamp) {
public Builder startTimestamp(long startTimestamp) {
this.startTimestamp = startTimestamp;
return this;
}

public Builder withStatusEvents(Boolean statusEvents) {
public Builder statusEvents(Boolean statusEvents) {
this.statusEvents = statusEvents;
return this;
}

public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public StreamOptions build() {
return new StreamOptions(this);
}
Expand Down
Loading

0 comments on commit eccb34d

Please sign in to comment.