diff --git a/src/main/java/com/fauna/client/FaunaClient.java b/src/main/java/com/fauna/client/FaunaClient.java index 12182766..6a0b809e 100644 --- a/src/main/java/com/fauna/client/FaunaClient.java +++ b/src/main/java/com/fauna/client/FaunaClient.java @@ -6,18 +6,15 @@ import com.fauna.codec.DefaultCodecRegistry; import com.fauna.event.FaunaStream; import com.fauna.event.FeedIterator; +import com.fauna.event.StreamOptions; import com.fauna.exception.ClientException; -import com.fauna.exception.ErrorHandler; import com.fauna.exception.FaunaException; -import com.fauna.exception.ProtocolException; import com.fauna.exception.ServiceException; import com.fauna.event.EventSource; import com.fauna.event.FeedOptions; import com.fauna.event.FeedPage; import com.fauna.query.AfterToken; import com.fauna.query.QueryOptions; -import com.fauna.response.QueryFailure; -import com.fauna.event.StreamRequest; import com.fauna.event.EventSourceResponse; import com.fauna.query.builder.Query; import com.fauna.response.QueryResponse; @@ -143,16 +140,7 @@ private Supplier>> makeAsyncFeedRequest(HttpCl return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply( response -> { logResponse(response); - if (response.statusCode() >= 400) { - // There are possibly some different error cases to handle for feeds. This seems like - // a comprehensive solution for now. In the future we could rename QueryFailure et. al. to - // something like FaunaFailure, or implement "FeedFailure". - QueryFailure failure = new QueryFailure(response.statusCode(), QueryResponse.builder(codec)); - ErrorHandler.handleQueryFailure(response.statusCode(), failure); - // Fall back on ProtocolException. - throw new ProtocolException(response.statusCode(), failure); - } - return FeedPage.parseResponse(response, codec); + return FeedPage.parseResponse(response, codec, statsCollector); }).whenComplete(this::completeFeedRequest); } @@ -437,13 +425,14 @@ public PageIterator paginate(Query fql, Class elementClass) { * Send a request to the Fauna stream endpoint, and return a CompletableFuture that completes with the FaunaStream * publisher. * - * @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp. - * @param elementClass The expected class <E> of the stream events. + * @param eventSource The Event Source (e.g. token from `.eventSource()`). + * @param streamOptions The Stream Options (including start timestamp, retry strategy). * @return CompletableFuture A CompletableFuture of FaunaStream. * @throws FaunaException If the query does not succeed, an exception will be thrown. */ - public CompletableFuture> asyncStream(StreamRequest streamRequest, Class elementClass) { - HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(streamRequest); + public CompletableFuture> asyncStream(EventSource eventSource, + StreamOptions streamOptions, Class elementClass) { + HttpRequest streamReq = getStreamRequestBuilder().buildStreamRequest(eventSource, streamOptions); return getHttpClient().sendAsync(streamReq, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> { CompletableFuture> publisher = new CompletableFuture<>(); @@ -456,13 +445,15 @@ public CompletableFuture> asyncStream(StreamRequest streamReq /** * Send a request to the Fauna stream endpoint to start a stream, and return a FaunaStream publisher. - * @param streamRequest The request object including a stream token, and optionally a cursor, or timestamp. - * @param elementClass The expected class <E> of the stream events. + * + * @param eventSource The request object including a stream token, and optionally a cursor, or timestamp. + * @param streamOptions + * @param elementClass The expected class <E> of the stream events. * @return FaunaStream A publisher, implementing Flow.Publisher<StreamEvent<E>> from the Java Flow API. * @throws FaunaException If the query does not succeed, an exception will be thrown. */ - public FaunaStream stream(StreamRequest streamRequest, Class elementClass) { - return completeAsync(asyncStream(streamRequest, elementClass), STREAM_SUBSCRIPTION); + public FaunaStream stream(EventSource eventSource, StreamOptions streamOptions, Class elementClass) { + return completeAsync(asyncStream(eventSource, streamOptions, elementClass), STREAM_SUBSCRIPTION); } /** @@ -470,14 +461,17 @@ public FaunaStream stream(StreamRequest streamRequest, Class elementCl * publisher. This method sends two requests, one to the query endpoint to get the stream token, and then another * to the stream endpoint. This method is equivalent to calling the query, then the stream methods on FaunaClient. * - * @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`. + * This method does not take QueryOptions, or StreamOptions as parameters. If you need specify either + * query, or stream options; you can use the asyncQuery/asyncStream methods. + * + * @param fql The FQL query to be executed. It must return an event source, e.g. ends in `.eventSource()`. * @param elementClass The expected class <E> of the stream events. * @return FaunaStream A publisher, implementing Flow.Publisher<StreamEvent<E>> from the Java Flow API. * @throws FaunaException If the query does not succeed, an exception will be thrown. */ public CompletableFuture> asyncStream(Query fql, Class elementClass) { - return this.asyncQuery(fql, EventSourceResponse.class).thenApply( - queryResponse -> this.stream(StreamRequest.fromTokenResponse(queryResponse.getData()), elementClass)); + return this.asyncQuery(fql, EventSourceResponse.class).thenApply(queryResponse -> + this.stream(EventSource.fromResponse(queryResponse.getData()), StreamOptions.builder().build(), elementClass)); } /** @@ -486,9 +480,10 @@ public CompletableFuture> asyncStream(Query fql, Class ele * the stream token, and then another request to the stream endpoint which return the FaunaStream publisher. * *

- * Query = fql("Product.all().toStream()"); - * QuerySuccess<StreamTokenResponse> tokenResp = client.query(fql, StreamTokenResponse.class); - * FaunaStream<Product> faunaStream = client.stream(new StreamRequest(tokenResp.getData.getToken(), Product.class) + * Query = fql("Product.all().eventSource()"); + * QuerySuccess<EventSourceResponse> querySuccess = client.query(fql, EventSourceResponse.class); + * EventSource source = EventSource.fromResponse(querySuccess.getData()); + * FaunaStream<Product> faunaStream = client.stream(source, StreamOptions.DEFAULT, Product.class) * * @param fql The FQL query to be executed. It must return a stream, e.g. ends in `.toStream()`. * @param elementClass The expected class <E> of the stream events. @@ -504,15 +499,15 @@ public FaunaStream stream(Query fql, Class elementClass) { /** * Send a request to the Fauna feed endpoint, and return a CompletableFuture that completes with the feed page. - * @param source An EventSource object with the feed token. - * @param feedOptions The FeedOptions object (default options will be used if null). - * @param elementClass The expected class <E> of the feed events. - * @return FeedSuccess A CompletableFuture that completes with the successful feed response. - * @param The type of the feed events. + * @param eventSource An EventSource object (e.g. token from `.eventSource()`) + * @param feedOptions The FeedOptions object (default options will be used if null). + * @param elementClass The expected class <E> of the feed events. + * @return CompletableFuture A CompletableFuture that completes with a FeedPage<E>. + * @param The type of the feed events. */ - public CompletableFuture> poll(EventSource source, FeedOptions feedOptions, Class elementClass) { + public CompletableFuture> poll(EventSource eventSource, FeedOptions feedOptions, Class elementClass) { return new RetryHandler>(getRetryStrategy(), logger).execute(makeAsyncFeedRequest( - getHttpClient(), getFeedRequestBuilder().buildFeedRequest(source, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass))); + getHttpClient(), getFeedRequestBuilder().buildFeedRequest(eventSource, feedOptions != null ? feedOptions : FeedOptions.DEFAULT), codecProvider.get(elementClass))); } /** diff --git a/src/main/java/com/fauna/client/PageIterator.java b/src/main/java/com/fauna/client/PageIterator.java index dfe3e5a8..1cce02eb 100644 --- a/src/main/java/com/fauna/client/PageIterator.java +++ b/src/main/java/com/fauna/client/PageIterator.java @@ -44,6 +44,13 @@ public PageIterator(FaunaClient client, Query fql, Class resultClass, QueryOp this.queryFuture = client.asyncQuery(fql, this.pageClass, options); } + public PageIterator(FaunaClient client, Page firstPage, Class resultClass, QueryOptions options) { + this.client = client; + this.pageClass = new PageOf<>(resultClass); + this.options = options; + firstPage.getAfter().ifPresentOrElse(this::doPaginatedQuery, this::endPagination); + } + @Override public boolean hasNext() { return this.queryFuture != null; diff --git a/src/main/java/com/fauna/client/RequestBuilder.java b/src/main/java/com/fauna/client/RequestBuilder.java index ed780385..a8066743 100644 --- a/src/main/java/com/fauna/client/RequestBuilder.java +++ b/src/main/java/com/fauna/client/RequestBuilder.java @@ -1,24 +1,21 @@ package com.fauna.client; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.fauna.codec.Codec; import com.fauna.codec.CodecProvider; import com.fauna.env.DriverEnvironment; +import com.fauna.event.StreamOptions; +import com.fauna.event.StreamRequest; import com.fauna.exception.ClientException; import com.fauna.event.EventSource; import com.fauna.event.FeedOptions; import com.fauna.event.FeedRequest; import com.fauna.query.QueryOptions; -import com.fauna.event.StreamRequest; import com.fauna.query.builder.Query; import com.fauna.codec.UTF8FaunaGenerator; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; import java.net.http.HttpRequest; -import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.time.Duration; import java.util.logging.Logger; @@ -133,36 +130,11 @@ public HttpRequest buildRequest(Query fql, QueryOptions options, CodecProvider p } } - /** - * Builds and returns the request body for the Fauna Streams API. - * @param request An Object representing the Stream request. - * @return - * @throws IOException - */ - public String buildStreamRequestBody(StreamRequest request) throws IOException { - // Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example, - // start_ts is a JSON numeric/integer, not a tagged '@long'. - ByteArrayOutputStream requestBytes = new ByteArrayOutputStream(); - JsonGenerator gen = new JsonFactory().createGenerator(requestBytes); - gen.writeStartObject(); - gen.writeStringField(FieldNames.TOKEN, request.getToken()); - // Only one of cursor / start_ts can be present, prefer cursor. - // Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException. - if (request.getCursor().isPresent()) { - gen.writeStringField(FieldNames.CURSOR, request.getCursor().get()); - } else if (request.getStartTs().isPresent()) { - gen.writeNumberField(FieldNames.START_TS, request.getStartTs().get()); - } - gen.writeEndObject(); - gen.flush(); - return requestBytes.toString(StandardCharsets.UTF_8); - } - - public HttpRequest buildStreamRequest(StreamRequest request) { + public HttpRequest buildStreamRequest(EventSource eventSource, StreamOptions streamOptions) { HttpRequest.Builder builder = baseRequestBuilder.copy(); - request.getTimeout().ifPresent(builder::timeout); + streamOptions.getTimeout().ifPresent(builder::timeout); try { - String body = buildStreamRequestBody(request); + String body = new StreamRequest(eventSource, streamOptions).serialize(); HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build(); logRequest(body, req); return req; diff --git a/src/main/java/com/fauna/event/FeedPage.java b/src/main/java/com/fauna/event/FeedPage.java index 75565203..1dbf5fce 100644 --- a/src/main/java/com/fauna/event/FeedPage.java +++ b/src/main/java/com/fauna/event/FeedPage.java @@ -2,8 +2,10 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; +import com.fauna.client.StatsCollector; import com.fauna.codec.Codec; import com.fauna.exception.ClientResponseException; +import com.fauna.response.QueryResponse; import com.fauna.response.QueryStats; import java.io.IOException; @@ -62,13 +64,15 @@ public QueryStats getStats() { public static class Builder { private final Codec elementCodec; + private final StatsCollector statsCollector; public List> events; public String cursor = ""; public Boolean hasNext = false; public QueryStats stats = null; - public Builder(Codec elementCodec) { + public Builder(Codec elementCodec, StatsCollector statsCollector) { this.elementCodec = elementCodec; + this.statsCollector = statsCollector; } public Builder events(List> events) { @@ -116,32 +120,42 @@ public Builder parseField(JsonParser parser) throws IOException { case EVENTS_FIELD_NAME: return parseEvents(parser); case STATS_FIELD_NAME: - return stats(QueryStats.parseStats(parser)); + QueryStats stats = QueryStats.parseStats(parser); + statsCollector.add(stats); + return stats(stats); case FEED_HAS_NEXT_FIELD_NAME: return hasNext(parser.nextBooleanValue()); default: - throw new ClientResponseException("Unknown StreamEvent field: " + fieldName); + throw new ClientResponseException("Unknown FeedPage field: " + fieldName); } } } - public static Builder builder(Codec elementCodec) { - return new Builder<>(elementCodec); + public static Builder builder(Codec elementCodec, StatsCollector statsCollector) { + return new Builder<>(elementCodec, statsCollector); } - public static FeedPage parseResponse(HttpResponse response, Codec elementCodec) { + public static FeedPage parseResponse(HttpResponse response, Codec elementCodec, StatsCollector statsCollector) { try { + // If you want to inspect the request body before it's parsed, you can uncomment this. + // String body = new BufferedReader(new InputStreamReader(response.body())) + // .lines().collect(Collectors.joining("\n")); + if (response.statusCode() >= 400) { + // It's not ideal to use the QueryResponse parser in the Feed API. But for error cases, the + // error response that the Feed API throws is a terser (i.e. subset) version of QueryFailure, and the + // parser gracefully handles it. A FaunaException will be thrown by parseResponse. + QueryResponse.parseResponse(response, elementCodec, statsCollector); + } JsonParser parser = JSON_FACTORY.createParser(response.body()); - if (parser.nextToken() == START_OBJECT) { - Builder builder = FeedPage.builder(elementCodec); - while (parser.nextToken() == FIELD_NAME) { - builder.parseField(parser); - } - return builder.build(); - } else { + if (parser.nextToken() != START_OBJECT) { throw new ClientResponseException("Invalid event starting with: " + parser.currentToken()); } + Builder builder = FeedPage.builder(elementCodec, statsCollector); + while (parser.nextToken() == FIELD_NAME) { + builder.parseField(parser); + } + return builder.build(); } catch (IOException e) { throw new ClientResponseException("Error parsing Feed response.", e); } diff --git a/src/main/java/com/fauna/event/StreamOptions.java b/src/main/java/com/fauna/event/StreamOptions.java index 5a03527e..1bd1d85a 100644 --- a/src/main/java/com/fauna/event/StreamOptions.java +++ b/src/main/java/com/fauna/event/StreamOptions.java @@ -2,18 +2,29 @@ import com.fauna.client.RetryStrategy; +import java.time.Duration; import java.util.Optional; public class StreamOptions { + private final String cursor; private final RetryStrategy retryStrategy; private final Long startTimestamp; private final Boolean statusEvents; + private final Duration timeout; + + public static final StreamOptions DEFAULT = StreamOptions.builder().build(); public StreamOptions(Builder builder) { + this.cursor = builder.cursor; this.retryStrategy = builder.retryStrategy; this.startTimestamp = builder.startTimestamp; this.statusEvents = builder.statusEvents; + this.timeout = builder.timeout; + } + + public Optional getCursor() { + return Optional.ofNullable(cursor); } public Optional getRetryStrategy() { @@ -29,27 +40,43 @@ public Optional getStatusEvents() { return Optional.ofNullable(statusEvents); } + public Optional getTimeout() { + return Optional.ofNullable(timeout); + } + public static class Builder { + public String cursor = null; public RetryStrategy retryStrategy = null; public Long startTimestamp = null; public Boolean statusEvents = null; + public Duration timeout = null; - public Builder withRetryStrategy(RetryStrategy retryStrategy) { + public Builder cursor(String cursor) { + this.cursor = cursor; + return this; + } + + public Builder retryStrategy(RetryStrategy retryStrategy) { this.retryStrategy = retryStrategy; return this; } - public Builder withStartTimestamp(long startTimestamp) { + public Builder startTimestamp(long startTimestamp) { this.startTimestamp = startTimestamp; return this; } - public Builder withStatusEvents(Boolean statusEvents) { + public Builder statusEvents(Boolean statusEvents) { this.statusEvents = statusEvents; return this; } + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + public StreamOptions build() { return new StreamOptions(this); } diff --git a/src/main/java/com/fauna/event/StreamRequest.java b/src/main/java/com/fauna/event/StreamRequest.java index 83134196..41272e73 100644 --- a/src/main/java/com/fauna/event/StreamRequest.java +++ b/src/main/java/com/fauna/event/StreamRequest.java @@ -1,5 +1,13 @@ package com.fauna.event; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fauna.client.RequestBuilder; +import jdk.jfr.Event; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Optional; @@ -7,133 +15,37 @@ * This class defines the request body expected by the fauna /stream endpoint. */ public class StreamRequest { - private final String token; - private final String cursor; - private final Long startTs; - private final Duration timeout; - - StreamRequest(String token, String cursor, Long startTs, Duration timeout) { - this.token = token; - this.cursor = cursor; - this.startTs = startTs; - this.timeout = timeout; - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token cannot be null or empty"); + private final EventSource source; + private final StreamOptions options; + + public StreamRequest(EventSource eventSource, StreamOptions streamOptions) { + this.source = eventSource; + this.options = streamOptions; + if (source == null) { + throw new IllegalArgumentException("Event source cannot be null."); } - if (cursor != null && startTs != null) { - throw new IllegalArgumentException("Only one of cursor, or start_ts can be set."); + if (options == null) { + throw new IllegalArgumentException("Stream options cannot be null."); } } - public static StreamRequest fromTokenResponse(EventSourceResponse tokenResponse) { - return new StreamRequest(tokenResponse.getToken(), null, null, null); - } - - public static class Builder { - final String token; - String cursor = null; - Long startTs = null; - Duration timeout = null; - - /** - * Return a new StreamRequest.Builder instance with the given token. - * @param token A Fauna Stream token. - */ - public Builder(String token) { - this.token = token; - } - - /** - * Return the current Builder instance with the given cursor. - * @param cursor A Fauna Stream cursor. - * @return The current Builder instance. - * @throws IllegalArgumentException If startTs has already been set. - */ - public Builder cursor(String cursor) { - if (this.startTs != null) { - throw new IllegalArgumentException("only one of cursor, or startTs can be set."); - } - this.cursor = cursor; - return this; - } - - /** - * Return the current Builder instance with the given start timestamp. - * @param startTs A timestamp to start the stream at. - * @return The current Builder instance. - * @throws IllegalArgumentException If startTs has already been set. - */ - public Builder startTs(Long startTs) { - if (this.cursor != null) { - throw new IllegalArgumentException("only one of cursor, or startTs can be set."); - } - this.startTs = startTs; - return this; - } - - /** - * Return the current builder instance with the given timeout. - * This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder. - * The Java documentation says that if "the response is not received within the specified timeout then an - * HttpTimeoutException is thrown". For streaming this means that the exception is thrown if the first - * headers/bytes are not recieved within the timeout. - * - * The default value is null if the user does not set this timeout. - * @param timeout A Duration representing the timeout. - * @return The current Builder instance. - */ - public Builder timeout(Duration timeout) { - this.timeout = timeout; - return this; - } - - public StreamRequest build() { - return new StreamRequest(token, cursor, startTs, timeout); + public String serialize() throws IOException { + // Use JsonGenerator directly rather than UTF8FaunaGenerator because this is not FQL. For example, + // start_ts is a JSON numeric/integer, not a tagged '@long'. + ByteArrayOutputStream requestBytes = new ByteArrayOutputStream(); + JsonGenerator gen = new JsonFactory().createGenerator(requestBytes); + gen.writeStartObject(); + gen.writeStringField(RequestBuilder.FieldNames.TOKEN, source.getToken()); + // Only one of cursor / start_ts can be present, prefer cursor. + // Cannot use ifPresent(val -> ...) because gen.write methods can throw an IOException. + if (options.getCursor().isPresent()) { + gen.writeStringField(RequestBuilder.FieldNames.CURSOR, options.getCursor().get()); + } else if (options.getStartTimestamp().isPresent()) { + gen.writeNumberField(RequestBuilder.FieldNames.START_TS, options.getStartTimestamp().get()); } - - } - - /** - * Create a new StreamRequest.Builder instance. - * @param token The Fauna Stream token to use. - * @return A new StreamRequest.Builder instance. - */ - public static Builder builder(String token) { - return new Builder(token); - } - - /** - * Stream token for the event stream to subscribe to. - * @return A String representing the Stream token. - */ - public String getToken() { - return token; + gen.writeEndObject(); + gen.flush(); + return requestBytes.toString(StandardCharsets.UTF_8); } - /** - * Cursor for a previous event. If provided, the stream replays any events that occurred after - * the cursor (exclusive). - * @return The cursor, or Optional.empty() if not provided. - */ - public Optional getCursor() { - return Optional.ofNullable(cursor); - } - - /** - * Stream start time in microseconds since the Unix epoch. This is typically a previous event's txn_ts - * (transaction timestamp). - * @return The stream start time, as a Long, or Optional.empty() if not provided. - */ - public Optional getStartTs() { - return Optional.ofNullable(startTs); - } - - /** - * Stream HTTP request timeout. This timeout is passed to java.net.http.HttpRequest.Builder. The default - * is null/empty. - * @return The timeout Duration, or Optional.empty() if not set. - */ - public Optional getTimeout() { - return Optional.ofNullable(timeout); - } } diff --git a/src/main/java/com/fauna/exception/ServiceException.java b/src/main/java/com/fauna/exception/ServiceException.java index 66dd4593..8f090de4 100644 --- a/src/main/java/com/fauna/exception/ServiceException.java +++ b/src/main/java/com/fauna/exception/ServiceException.java @@ -90,7 +90,7 @@ public Optional getTxnTs() { * * @return the schema version as a long value */ - public long getSchemaVersion() { + public Long getSchemaVersion() { return this.response.getSchemaVersion(); } diff --git a/src/main/java/com/fauna/response/QueryFailure.java b/src/main/java/com/fauna/response/QueryFailure.java index 96de37fd..0143eaed 100644 --- a/src/main/java/com/fauna/response/QueryFailure.java +++ b/src/main/java/com/fauna/response/QueryFailure.java @@ -27,7 +27,6 @@ public String getMessage() { public Optional getAbort(Class clazz) { return errorInfo.getAbort(clazz); - } public String getFullMessage() { diff --git a/src/main/java/com/fauna/types/Page.java b/src/main/java/com/fauna/types/Page.java index 55fc2305..e1da82e0 100644 --- a/src/main/java/com/fauna/types/Page.java +++ b/src/main/java/com/fauna/types/Page.java @@ -11,7 +11,7 @@ * * @param The type of data contained in the page. */ -public class Page{ +public class Page { private final List data; private final String after; diff --git a/src/test/java/com/fauna/beans/InventorySource.java b/src/test/java/com/fauna/beans/InventorySource.java new file mode 100644 index 00000000..e6114d8f --- /dev/null +++ b/src/test/java/com/fauna/beans/InventorySource.java @@ -0,0 +1,10 @@ +package com.fauna.beans; + +import com.fauna.e2e.beans.Product; +import com.fauna.event.EventSourceResponse; +import com.fauna.types.Page; + +public class InventorySource { + public Page firstPage; + public EventSourceResponse eventSource; +} diff --git a/src/test/java/com/fauna/client/DefaultsTest.java b/src/test/java/com/fauna/client/DefaultsTest.java index dedcc7d7..830e8905 100644 --- a/src/test/java/com/fauna/client/DefaultsTest.java +++ b/src/test/java/com/fauna/client/DefaultsTest.java @@ -3,6 +3,7 @@ import com.fauna.codec.DefaultCodecProvider; import com.fauna.event.EventSource; import com.fauna.event.FeedOptions; +import com.fauna.event.StreamOptions; import com.fauna.query.QueryOptions; import com.fauna.event.StreamRequest; import org.junit.jupiter.api.Test; @@ -64,7 +65,7 @@ public void testTimeoutDefaults() { assertEquals(Duration.ofSeconds(10), feedRequest.timeout().orElseThrow()); // We do not want a timeout set for stream requests, because the client may hold the stream open indefinitely. - HttpRequest streamRequest = streamRequestBuilder.buildStreamRequest(StreamRequest.builder("token").build()); + HttpRequest streamRequest = streamRequestBuilder.buildStreamRequest(source, StreamOptions.builder().build()); assertTrue(streamRequest.headers().firstValue(RequestBuilder.Headers.QUERY_TIMEOUT_MS).isEmpty()); assertTrue(streamRequest.timeout().isEmpty()); @@ -85,7 +86,7 @@ public void testNullQueryTimeouts() { @Test public void testOverridingTimeouts() { - HttpRequest streamRequest = streamRequestBuilder.buildStreamRequest(StreamRequest.builder("token").timeout(Duration.ofMinutes(10)).build()); + HttpRequest streamRequest = streamRequestBuilder.buildStreamRequest(source, StreamOptions.builder().timeout(Duration.ofMinutes(10)).build()); assertEquals(Duration.ofMinutes(10), streamRequest.timeout().orElseThrow()); } diff --git a/src/test/java/com/fauna/client/FeedIteratorTest.java b/src/test/java/com/fauna/client/FeedIteratorTest.java index 48902f5b..9fafe57f 100644 --- a/src/test/java/com/fauna/client/FeedIteratorTest.java +++ b/src/test/java/com/fauna/client/FeedIteratorTest.java @@ -55,7 +55,7 @@ private CompletableFuture> successFuture(boolean after, int num "cursor0", System.currentTimeMillis() - 5, num + "-b", null, null)); - return CompletableFuture.supplyAsync(() -> FeedPage.builder(codec).events(events).cursor("cursor0").hasNext(after).build()); + return CompletableFuture.supplyAsync(() -> FeedPage.builder(codec, new StatsCollectorImpl()).events(events).cursor("cursor0").hasNext(after).build()); } private CompletableFuture> failureFuture() throws IOException { @@ -91,8 +91,7 @@ public void test_single_page_without_calling_hasNext() throws IOException { @Test public void test_multiple_pages() throws IOException { - when(client.poll(argThat(source::equals), argThat(FeedOptions.DEFAULT::equals), any(Class.class))).thenReturn(successFuture(true, 0)); - when(client.poll(argThat(source::equals), argThat(opts -> opts.getCursor().orElse("").equals(CURSOR_0)), any(Class.class))).thenReturn(successFuture(false, 1)); + when(client.poll(argThat(source::equals), any(), any(Class.class))).thenReturn(successFuture(true, 0), successFuture(false, 1)); FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); assertTrue(feedIterator.hasNext()); diff --git a/src/test/java/com/fauna/client/RequestBuilderTest.java b/src/test/java/com/fauna/client/RequestBuilderTest.java index 2bcc44e7..ab6859ce 100644 --- a/src/test/java/com/fauna/client/RequestBuilderTest.java +++ b/src/test/java/com/fauna/client/RequestBuilderTest.java @@ -4,6 +4,8 @@ import com.fauna.codec.CodecRegistry; import com.fauna.codec.DefaultCodecProvider; import com.fauna.codec.DefaultCodecRegistry; +import com.fauna.event.EventSource; +import com.fauna.event.StreamOptions; import com.fauna.query.QueryOptions; import com.fauna.event.StreamRequest; import org.junit.jupiter.api.Test; @@ -40,6 +42,8 @@ class RequestBuilderTest { .endpoint(FaunaConfig.FaunaEndpoint.LOCAL) .secret("secret").build(); + private static final EventSource SOURCE = EventSource.fromToken("tkn"); + private final RequestBuilder requestBuilder = RequestBuilder.queryRequestBuilder(faunaConfig, Logger.getGlobal()); private final CodecRegistry codecRegistry = new DefaultCodecRegistry(); @@ -88,7 +92,6 @@ void buildRequest_withCustomTimeoutBuffer() { RequestBuilder requestBuilder = RequestBuilder.queryRequestBuilder( FaunaConfig.builder().clientTimeoutBuffer(Duration.ofSeconds(1)).build(), Logger.getGlobal()); - HttpRequest req = requestBuilder.buildRequest(fql("42"), defaultOpts, codecProvider, 1L); assertEquals(Duration.ofSeconds(6), requestBuilder.buildRequest(fql("42"), defaultOpts, codecProvider, 1L).timeout().orElseThrow()); assertEquals(Duration.ofSeconds(16), requestBuilder.buildRequest(fql("42"), timeoutOpts, codecProvider, 1L).timeout().orElseThrow()); } @@ -96,9 +99,9 @@ void buildRequest_withCustomTimeoutBuffer() { @Test void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { // Given - StreamRequest request = StreamRequest.builder("tkn").build(); + StreamRequest request = new StreamRequest(SOURCE, StreamOptions.DEFAULT); // When - String body = requestBuilder.buildStreamRequestBody(request); + String body = request.serialize(); // Then assertEquals("{\"token\":\"tkn\"}", body); } @@ -106,21 +109,22 @@ void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { @Test void buildStreamRequestBody_shouldIncludeCursor() throws IOException { // Given - StreamRequest request = StreamRequest.builder("tkn").cursor("cur").build(); + HttpRequest req = requestBuilder.buildStreamRequest(SOURCE, StreamOptions.builder().cursor("cur").build()); // When - String body = requestBuilder.buildStreamRequestBody(request); + long contentLength = req.bodyPublisher().orElseThrow().contentLength(); + // Then - assertEquals("{\"token\":\"tkn\",\"cursor\":\"cur\"}", body); + assertEquals("{\"token\":\"tkn\",\"cursor\":\"cur\"}".length(), contentLength); } @Test void buildStreamRequestBody_shouldIncludeTimestamp() throws IOException { // Given - StreamRequest request = StreamRequest.builder("tkn").startTs(Long.MAX_VALUE / 2).build(); + HttpRequest request = requestBuilder.buildStreamRequest(SOURCE, StreamOptions.builder().startTimestamp(Long.MAX_VALUE).build()); // When - String body = requestBuilder.buildStreamRequestBody(request); + long contentLength = request.bodyPublisher().orElseThrow().contentLength(); // Then - assertEquals("{\"token\":\"tkn\",\"start_ts\":4611686018427387903}", body); + assertEquals("{\"token\":\"tkn\",\"start_ts\":4611686018427387903}".length(), contentLength); } @Test diff --git a/src/test/java/com/fauna/e2e/E2EFeedsTest.java b/src/test/java/com/fauna/e2e/E2EFeedsTest.java index 90e2a513..3f0f75b8 100644 --- a/src/test/java/com/fauna/e2e/E2EFeedsTest.java +++ b/src/test/java/com/fauna/e2e/E2EFeedsTest.java @@ -9,6 +9,8 @@ import com.fauna.event.FeedOptions; import com.fauna.event.FeedPage; import com.fauna.event.EventSourceResponse; +import com.fauna.exception.InvalidRequestException; +import com.fauna.response.QueryFailure; import com.fauna.response.QuerySuccess; import com.fauna.event.FaunaEvent; import org.junit.jupiter.api.BeforeAll; @@ -18,6 +20,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; import java.util.logging.Level; @@ -28,7 +31,9 @@ import static com.fauna.event.FaunaEvent.EventType.ERROR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; public class E2EFeedsTest { @@ -92,8 +97,33 @@ public void manualFeed() { @Test public void feedError() { + // Fauna can throw a HTTP error in some cases. In this case it's bad request to the feed API. Presumably + // some of the others like ThrottlingException, and AuthenticationException can also be thrown. + CompletableFuture> future= client.poll(EventSource.fromToken("badToken"), FeedOptions.DEFAULT, Product.class); + CompletionException ce = assertThrows(CompletionException.class, () -> future.join()); + InvalidRequestException ire = (InvalidRequestException) ce.getCause(); + + assertEquals("invalid_request", ire.getErrorCode()); + assertEquals(400, ire.getStatusCode()); + assertEquals("400 (invalid_request): Invalid request body: invalid event source provided.", ire.getMessage()); + assertTrue(ire.getTxnTs().isEmpty()); + assertNull(ire.getStats()); + assertNull(ire.getSchemaVersion()); + assertNull(ire.getSummary()); + assertNull(ire.getCause()); + assertNull(ire.getQueryTags()); + + QueryFailure failure = ire.getResponse(); + assertTrue(failure.getConstraintFailures().isEmpty()); + assertTrue(failure.getAbort(null).isEmpty()); + } + + @Test + public void feedEventError() { + // Fauna can also return a valid feed page, with HTTP 200, but an "error" event type. FeedOptions options = FeedOptions.builder().startTs(0L).build(); - FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); + QuerySuccess sourceQuery = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class); + FeedIterator iter = client.feed(EventSource.fromResponse(sourceQuery.getData()), options, Product.class); FeedPage pageOne = iter.next(); assertFalse(pageOne.hasNext()); assertEquals(1, pageOne.getEvents().size()); diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 1a99b6ec..9d7108c1 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -1,16 +1,22 @@ package com.fauna.e2e; +import com.fauna.beans.InventorySource; import com.fauna.client.Fauna; import com.fauna.client.FaunaClient; -import com.fauna.client.FaunaConfig; +import com.fauna.client.PageIterator; +import com.fauna.client.QueryStatsSummary; +import com.fauna.event.EventSource; import com.fauna.event.FaunaStream; import com.fauna.e2e.beans.Product; +import com.fauna.event.StreamOptions; import com.fauna.exception.ClientException; import com.fauna.event.EventSourceResponse; +import com.fauna.exception.NullDocumentException; +import com.fauna.query.QueryOptions; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; import com.fauna.event.FaunaEvent; -import com.fauna.event.StreamRequest; +import com.fauna.types.NullableDocument; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -19,6 +25,7 @@ import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -31,6 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.fauna.codec.Generic.nullableDocumentOf; import static com.fauna.query.builder.Query.fql; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -58,14 +66,38 @@ public static Query createProduct() { Map.of("name", randomName(5), "quantity", random.nextInt(1, 16))); } + public static void waitForSync(InventorySubscriber watcher) throws InterruptedException { + long start = System.currentTimeMillis(); + int events = watcher.countEvents(); + while (System.currentTimeMillis() < start + 2_000) { + Thread.sleep(10); + int latest = watcher.countEvents(); + if (latest > events) { + events = latest; + } + } + + } + + private static Long doDelete(String name) { + Query query = fql("Product.firstWhere(.name == ${name})!.delete()", Map.of("name", name)); + QuerySuccess> success = client.query(query, nullableDocumentOf(Product.class)); + NullableDocument nullDoc = success.getData(); + assertThrows(NullDocumentException.class, () -> nullDoc.get()); + return success.getLastSeenTxn(); + } static class InventorySubscriber implements Flow.Subscriber> { private final AtomicLong timestamp = new AtomicLong(0); private String cursor = null; private final AtomicInteger events = new AtomicInteger(0); - Map inventory = new ConcurrentHashMap<>(); + private final Map inventory; Flow.Subscription subscription; + public InventorySubscriber(Map inventory) { + this.inventory = inventory; + } + @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; @@ -74,9 +106,12 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(FaunaEvent event) { - System.out.println(MessageFormat.format("Event {0}, {1}", event.getCursor(), event.getTimestamp().orElse(-1L))); events.addAndGet(1); - event.getData().ifPresent(product -> inventory.put(product.getName(), product.getQuantity())); + if (event.getType().equals(FaunaEvent.EventType.ADD) || event.getType().equals(FaunaEvent.EventType.UPDATE)) { + event.getData().ifPresent(product -> inventory.put(product.getName(), product.getQuantity())); + } else if (event.getType().equals(FaunaEvent.EventType.REMOVE)) { + event.getData().ifPresent(product -> inventory.remove(product.getName())); + } // Fauna delivers events to the client in order, but it's up to the user to keep those events in order. event.getTimestamp().ifPresent(ts -> this.timestamp.updateAndGet(value -> value < ts ? value : ts)); this.cursor = event.getCursor(); @@ -114,57 +149,83 @@ public String status() { @Test public void query_streamOfProduct() throws InterruptedException { - var cfg = FaunaConfig.builder() - .secret("secret") - .endpoint("http://localhost:8443") - .build(); - var streamClient = Fauna.client(cfg); - - FaunaStream stream = streamClient.stream(fql("Product.all().toStream()"), Product.class); - var stats = streamClient.getStatsCollector().readAndReset(); - assertEquals(1, stats.getComputeOps()); - - InventorySubscriber inventory = new InventorySubscriber(); - stream.subscribe(inventory); - assertFalse(stream.isClosed()); + QueryStatsSummary initialStats = client.getStatsCollector().read(); + // Initialize stream outside try-with-resources so we can assert that it's closed at the end of this test;. + FaunaStream stream = client.stream(fql("Product.all().eventSource()"), Product.class); + try (stream) { + InventorySubscriber watcher = new InventorySubscriber(new ConcurrentHashMap<>()); + stream.subscribe(watcher); + assertFalse(stream.isClosed()); + + doDelete("product-2"); + + waitForSync(watcher); + watcher.onComplete(); + assertFalse(stream.isClosed()); + } + stream.close(); + assertTrue(stream.isClosed()); + } + @Test + public void query_trackStateWithStream() throws InterruptedException { + // This test demonstrates querying the current state of a collection, and then tracking the changes from + // that moment on, which guarantees that no updates will be missed. + QueryStatsSummary initialStats = client.getStatsCollector().read(); + QuerySuccess success = client.query(fql("let products = Product.all()\n{ firstPage: products.pageSize(4), eventSource: products.eventSource()}"), InventorySource.class ); + InventorySource inventorySource = success.getData(); + + // First, we process all the products that existed when the query was made. + PageIterator pageIterator = new PageIterator<>(client, inventorySource.firstPage, Product.class, QueryOptions.DEFAULT); + Map inventory = new HashMap<>(); List products = new ArrayList<>(); - products.add(client.query(fql("Product.create({name: 'cheese', quantity: 1})"), Product.class).getData()); - products.add(client.query(fql("Product.create({name: 'bread', quantity: 2})"), Product.class).getData()); - products.add(client.query(fql("Product.create({name: 'wine', quantity: 3})"), Product.class).getData()); - - long start = System.currentTimeMillis(); - int events = inventory.countEvents(); - - while (System.currentTimeMillis() < start + 2_000) { - Thread.sleep(10); - int latest = inventory.countEvents(); - if (latest > events) { - events = latest; - } + pageIterator.flatten().forEachRemaining(product -> { + products.add(product); inventory.put(product.getName(), product.getQuantity()); + }); + + // Now start a stream based on same query, and it's transaction timestamp. + EventSource eventSource = EventSource.fromResponse(inventorySource.eventSource); + StreamOptions streamOptions = StreamOptions.builder().startTimestamp(success.getLastSeenTxn()).build(); + FaunaStream stream = client.stream(eventSource, streamOptions, Product.class); + try (stream) { + InventorySubscriber watcher = new InventorySubscriber(inventory); + stream.subscribe(watcher); + assertFalse(stream.isClosed()); + products.add(client.query(fql("Product.create({name: 'bread', quantity: 2})"), Product.class).getData()); + products.add(client.query(fql("Product.firstWhere(.name == \"product-0\")!.update({quantity: 30})"), Product.class).getData()); + waitForSync(watcher); + int before = watcher.countInventory(); + + client.query(fql("Product.create({name: 'cheese', quantity: 17})"), Product.class).getData(); + waitForSync(watcher); + assertEquals(before + 17, watcher.countInventory()); + + doDelete("cheese"); + waitForSync(watcher); + assertEquals(before, watcher.countInventory()); + + watcher.onComplete(); + Integer total = products.stream().map(Product::getQuantity).reduce(0, Integer::sum); + assertEquals(total, watcher.countInventory()); + assertFalse(stream.isClosed()); } - inventory.onComplete(); - Integer total = products.stream().map(Product::getQuantity).reduce(0, Integer::sum); - - assertEquals(total, inventory.countInventory()); - - assertFalse(stream.isClosed()); stream.close(); assertTrue(stream.isClosed()); - stats = streamClient.getStatsCollector().read(); - assertEquals(11, stats.getReadOps()); - assertEquals(events, stats.getComputeOps()); + QueryStatsSummary finalStats = client.getStatsCollector().read(); + assertTrue(finalStats.getReadOps() > initialStats.getReadOps()); + assertTrue(finalStats.getComputeOps() > initialStats.getComputeOps()); } @Test public void handleStreamError() throws InterruptedException { // It would be nice to have another test that generates a stream with normal events, and then an error // event, but this at least tests some of the functionality. - QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), EventSourceResponse.class); - StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build(); - FaunaStream stream = client.stream(request, Product.class); - InventorySubscriber inventory = new InventorySubscriber(); + QuerySuccess queryResp = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class); + EventSource source = EventSource.fromResponse(queryResp.getData()); + StreamOptions options = StreamOptions.builder().cursor("invalid_cursor").build(); + FaunaStream stream = client.stream(source, options, Product.class); + InventorySubscriber inventory = new InventorySubscriber(new ConcurrentHashMap<>()); stream.subscribe(inventory); long start = System.currentTimeMillis(); while (!stream.isClosed() && System.currentTimeMillis() < (start + 5_000)) { @@ -175,21 +236,22 @@ public void handleStreamError() throws InterruptedException { @Test public void handleStreamTimeout() { - QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), EventSourceResponse.class); - StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build(); - ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class)); + QuerySuccess queryResp = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class); + EventSource source = EventSource.fromResponse(queryResp.getData()); + StreamOptions options = StreamOptions.builder().timeout(Duration.ofMillis(1)).build(); + ClientException exc = assertThrows(ClientException.class, () -> client.stream(source, options, Product.class)); assertEquals(ExecutionException.class, exc.getCause().getClass()); assertEquals(HttpTimeoutException.class, exc.getCause().getCause().getClass()); - - } @Disabled("Will fix this for GA, I think the other drivers have this bug too.") @Test public void handleLargeEvents() throws InterruptedException { - FaunaStream stream = client.stream(fql("Product.all().toStream()"), Product.class); - InventorySubscriber inventory = new InventorySubscriber(); - stream.subscribe(inventory); + InventorySubscriber inventory; + try (FaunaStream stream = client.stream(fql("Product.all().eventSource()"), Product.class)) { + inventory = new InventorySubscriber(new ConcurrentHashMap<>()); + stream.subscribe(inventory); + } List products = new ArrayList<>(); byte[] image = new byte[20]; @@ -234,8 +296,8 @@ public void handleLargeEvents() throws InterruptedException { @Disabled("This test sometimes causes Fauna to generate an error for getting too far behind.") @Test public void handleManyEvents() throws InterruptedException { - FaunaStream stream = client.stream(fql("Product.all().toStream()"), Product.class); - InventorySubscriber inventory = new InventorySubscriber(); + FaunaStream stream = client.stream(fql("Product.all().eventSource()"), Product.class); + InventorySubscriber inventory = new InventorySubscriber(new ConcurrentHashMap<>()); stream.subscribe(inventory); List> productFutures = new ArrayList<>(); diff --git a/src/test/java/com/fauna/stream/StreamRequestTest.java b/src/test/java/com/fauna/stream/StreamRequestTest.java index 3e7d67d0..99724ddd 100644 --- a/src/test/java/com/fauna/stream/StreamRequestTest.java +++ b/src/test/java/com/fauna/stream/StreamRequestTest.java @@ -2,44 +2,42 @@ import com.fauna.codec.CodecProvider; import com.fauna.codec.DefaultCodecProvider; +import com.fauna.event.EventSource; +import com.fauna.event.StreamOptions; import com.fauna.event.StreamRequest; import org.junit.jupiter.api.Test; +import java.io.IOException; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class StreamRequestTest { public static final CodecProvider provider = DefaultCodecProvider.SINGLETON; + private final static EventSource SOURCE = new EventSource("abc"); @Test - public void testTokenOnlyRequest() { - StreamRequest req = StreamRequest.builder("abc").build(); - assertEquals("abc", req.getToken()); - assertTrue(req.getCursor().isEmpty()); - assertTrue(req.getStartTs().isEmpty()); + public void testTokenOnlyRequest() throws IOException { + StreamRequest req = new StreamRequest(SOURCE, StreamOptions.DEFAULT); + assertEquals("{\"token\":\"abc\"}", req.serialize()); } @Test - public void testCursorRequest() { - StreamRequest req = StreamRequest.builder("abc").cursor("def").build(); - assertEquals("abc", req.getToken()); - assertEquals("def", req.getCursor().get()); - assertTrue(req.getStartTs().isEmpty()); + public void testCursorRequest() throws IOException { + StreamRequest req = new StreamRequest(SOURCE, StreamOptions.builder().cursor("def").build()); + assertEquals("{\"token\":\"abc\",\"cursor\":\"def\"}", req.serialize()); } @Test - public void testTsRequest() { - StreamRequest req = StreamRequest.builder("abc").startTs(1234L).build(); - assertEquals("abc", req.getToken()); - assertTrue(req.getCursor().isEmpty()); - assertEquals(1234L, req.getStartTs().get()); + public void testTsRequest() throws IOException { + StreamRequest req = new StreamRequest(SOURCE, StreamOptions.builder().startTimestamp(1234L).build()); + assertEquals("{\"token\":\"abc\",\"start_ts\":1234}", req.serialize()); } @Test - public void testCursorAndTsRequest() { - assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").startTs(10L).cursor("hello")); - assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").cursor("hello").startTs(10L)); + public void testMissingArgsRequest() { + assertThrows(IllegalArgumentException.class, () -> new StreamRequest(SOURCE, null)); + assertThrows(IllegalArgumentException.class, () -> new StreamRequest(null, StreamOptions.DEFAULT)); } }