diff --git a/src/main/java/com/fauna/event/StreamEvent.java b/src/main/java/com/fauna/event/FaunaEvent.java similarity index 87% rename from src/main/java/com/fauna/event/StreamEvent.java rename to src/main/java/com/fauna/event/FaunaEvent.java index 1e095ad0..7398dbf3 100644 --- a/src/main/java/com/fauna/event/StreamEvent.java +++ b/src/main/java/com/fauna/event/FaunaEvent.java @@ -21,7 +21,7 @@ import static com.fauna.constants.ResponseFields.CURSOR_FIELD_NAME; import static com.fauna.constants.ResponseFields.STREAM_TYPE_FIELD_NAME; -public class StreamEvent { +public class FaunaEvent { public enum EventType { STATUS, ADD, UPDATE, REMOVE, ERROR } @@ -34,7 +34,7 @@ public enum EventType { private final ErrorInfo error; - public StreamEvent(EventType type, String cursor, Long txn_ts, E data, QueryStats stats, ErrorInfo error) { + public FaunaEvent(EventType type, String cursor, Long txn_ts, E data, QueryStats stats, ErrorInfo error) { this.type = type; this.cursor = cursor; this.txn_ts = txn_ts; @@ -46,7 +46,7 @@ public StreamEvent(EventType type, String cursor, Long txn_ts, E data, QueryStat public static class Builder { private final Codec dataCodec; String cursor = null; - StreamEvent.EventType eventType = null; + FaunaEvent.EventType eventType = null; QueryStats stats = null; E data = null; Long txn_ts = null; @@ -61,7 +61,7 @@ public Builder cursor(String cursor) { return this; } - public Builder eventType(StreamEvent.EventType eventType) { + public Builder eventType(FaunaEvent.EventType eventType) { this.eventType = eventType; return this; } @@ -93,8 +93,8 @@ public Builder error(ErrorInfo error) { return this; } - public StreamEvent build() { - return new StreamEvent<>(eventType, cursor, txn_ts, data, stats, errorInfo); + public FaunaEvent build() { + return new FaunaEvent<>(eventType, cursor, txn_ts, data, stats, errorInfo); } } @@ -124,11 +124,11 @@ static Builder parseField(Builder builder, JsonParser parser) throws I } - private static StreamEvent.EventType parseEventType(JsonParser parser) throws IOException { + private static FaunaEvent.EventType parseEventType(JsonParser parser) throws IOException { if (parser.nextToken() == VALUE_STRING) { String typeString = parser.getText().toUpperCase(); try { - return StreamEvent.EventType.valueOf(typeString); + return FaunaEvent.EventType.valueOf(typeString); } catch (IllegalArgumentException e) { throw new ClientResponseException("Invalid event type: " + typeString, e); } @@ -137,9 +137,9 @@ private static StreamEvent.EventType parseEventType(JsonParser parser) throws IO } } - public static StreamEvent parse(JsonParser parser, Codec dataCodec) throws IOException { + public static FaunaEvent parse(JsonParser parser, Codec dataCodec) throws IOException { if (parser.currentToken() == START_OBJECT || parser.nextToken() == START_OBJECT) { - Builder builder = StreamEvent.builder(dataCodec); + Builder builder = FaunaEvent.builder(dataCodec); while (parser.nextToken() == FIELD_NAME) { builder = parseField(builder, parser); } @@ -149,7 +149,7 @@ public static StreamEvent parse(JsonParser parser, Codec dataCodec) th } } - public StreamEvent.EventType getType() { + public FaunaEvent.EventType getType() { return type; } diff --git a/src/main/java/com/fauna/event/FaunaStream.java b/src/main/java/com/fauna/event/FaunaStream.java index 506be41a..149bb409 100644 --- a/src/main/java/com/fauna/event/FaunaStream.java +++ b/src/main/java/com/fauna/event/FaunaStream.java @@ -12,18 +12,17 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.List; -import java.util.Optional; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; -public class FaunaStream extends SubmissionPublisher> implements Processor, StreamEvent> { +public class FaunaStream extends SubmissionPublisher> implements Processor, FaunaEvent> { private static final JsonFactory JSON_FACTORY = new JsonFactory(); private final Codec dataCodec; private Subscription subscription; - private Subscriber> eventSubscriber; + private Subscriber> eventSubscriber; private MultiByteBufferInputStream buffer = null; private final StatsCollector statsCollector; @@ -33,7 +32,7 @@ public FaunaStream(Class elementClass, StatsCollector statsCollector) { } @Override - public void subscribe(Subscriber> subscriber) { + public void subscribe(Subscriber> subscriber) { if (this.eventSubscriber == null) { this.eventSubscriber = subscriber; super.subscribe(subscriber); @@ -61,11 +60,11 @@ public void onNext(List buffers) { } try { JsonParser parser = JSON_FACTORY.createParser(buffer); - StreamEvent event = StreamEvent.parse(parser, dataCodec); + FaunaEvent event = FaunaEvent.parse(parser, dataCodec); statsCollector.add(event.getStats()); - if (event.getType() == StreamEvent.EventType.ERROR) { + if (event.getType() == FaunaEvent.EventType.ERROR) { ErrorInfo error = event.getError(); this.onComplete(); this.close(); diff --git a/src/main/java/com/fauna/event/FeedIterator.java b/src/main/java/com/fauna/event/FeedIterator.java index 573919c2..5d3de51f 100644 --- a/src/main/java/com/fauna/event/FeedIterator.java +++ b/src/main/java/com/fauna/event/FeedIterator.java @@ -87,17 +87,17 @@ public FeedPage next() { * Return an iterator that iterates directly over the items that make up the page contents. * @return An iterator of E. */ - public Iterator> flatten() { + public Iterator> flatten() { return new Iterator<>() { private final FeedIterator feedIterator = FeedIterator.this; - private Iterator> thisPage = feedIterator.hasNext() ? feedIterator.next().getEvents().iterator() : null; + private Iterator> thisPage = feedIterator.hasNext() ? feedIterator.next().getEvents().iterator() : null; @Override public boolean hasNext() { return thisPage != null && (thisPage.hasNext() || feedIterator.hasNext()); } @Override - public StreamEvent next() { + public FaunaEvent next() { if (thisPage == null) { throw new NoSuchElementException(); } diff --git a/src/main/java/com/fauna/event/FeedPage.java b/src/main/java/com/fauna/event/FeedPage.java index e7ae3567..75565203 100644 --- a/src/main/java/com/fauna/event/FeedPage.java +++ b/src/main/java/com/fauna/event/FeedPage.java @@ -22,13 +22,13 @@ import static com.fauna.constants.ResponseFields.CURSOR_FIELD_NAME; public class FeedPage { - private final List> events; + private final List> events; private final String cursor; private final boolean hasNext; private final QueryStats stats; private static final JsonFactory JSON_FACTORY = new JsonFactory(); - public FeedPage(final List> events, + public FeedPage(final List> events, final String cursor, final boolean hasNext, final QueryStats stats) { @@ -44,7 +44,7 @@ public FeedPage(final List> events, } } - public List> getEvents() { + public List> getEvents() { return events; } @@ -62,7 +62,7 @@ public QueryStats getStats() { public static class Builder { private final Codec elementCodec; - public List> events; + public List> events; public String cursor = ""; public Boolean hasNext = false; public QueryStats stats = null; @@ -71,7 +71,7 @@ public Builder(Codec elementCodec) { this.elementCodec = elementCodec; } - public Builder events(List> events) { + public Builder events(List> events) { this.events = events; return this; } @@ -93,9 +93,9 @@ public Builder stats(QueryStats stats) { public Builder parseEvents(JsonParser parser) throws IOException { if (parser.nextToken() == START_ARRAY) { - List> events = new ArrayList<>(); + List> events = new ArrayList<>(); while (parser.nextToken() != END_ARRAY) { - events.add(StreamEvent.parse(parser, elementCodec)); + events.add(FaunaEvent.parse(parser, elementCodec)); } this.events = events; } else { diff --git a/src/test/java/com/fauna/client/FeedIteratorTest.java b/src/test/java/com/fauna/client/FeedIteratorTest.java index 2e73fdf8..48902f5b 100644 --- a/src/test/java/com/fauna/client/FeedIteratorTest.java +++ b/src/test/java/com/fauna/client/FeedIteratorTest.java @@ -12,7 +12,7 @@ import com.fauna.response.ErrorInfo; import com.fauna.response.QueryFailure; import com.fauna.response.QueryResponse; -import com.fauna.event.StreamEvent; +import com.fauna.event.FaunaEvent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -46,12 +46,12 @@ public class FeedIteratorTest { private FaunaClient client; private CompletableFuture> successFuture(boolean after, int num) throws IOException { - List> events = new ArrayList<>(); + List> events = new ArrayList<>(); Codec codec = DefaultCodecProvider.SINGLETON.get(String.class); - events.add(new StreamEvent<>(StreamEvent.EventType.ADD, + events.add(new FaunaEvent<>(FaunaEvent.EventType.ADD, "cursor0", System.currentTimeMillis() - 10, num + "-a", null, null)); - events.add(new StreamEvent<>(StreamEvent.EventType.ADD, + events.add(new FaunaEvent<>(FaunaEvent.EventType.ADD, "cursor0", System.currentTimeMillis() - 5, num + "-b", null, null)); @@ -124,7 +124,7 @@ public void test_flatten() throws IOException { when(client.poll(argThat(source::equals), argThat(FeedOptions.DEFAULT::equals), any(Class.class))).thenReturn(successFuture(true, 0)); when(client.poll(argThat(source::equals), argThat(opts -> opts.getCursor().orElse("").equals(CURSOR_0)), any(Class.class))).thenReturn(successFuture(false, 1)); FeedIterator feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class); - Iterator> iter = feedIterator.flatten(); + Iterator> iter = feedIterator.flatten(); List products = new ArrayList<>(); iter.forEachRemaining(event -> products.add(event.getData().orElseThrow())); assertEquals(4, products.size()); diff --git a/src/test/java/com/fauna/e2e/E2EFeedsTest.java b/src/test/java/com/fauna/e2e/E2EFeedsTest.java index a6cc7bb5..90e2a513 100644 --- a/src/test/java/com/fauna/e2e/E2EFeedsTest.java +++ b/src/test/java/com/fauna/e2e/E2EFeedsTest.java @@ -10,7 +10,7 @@ import com.fauna.event.FeedPage; import com.fauna.event.EventSourceResponse; import com.fauna.response.QuerySuccess; -import com.fauna.event.StreamEvent; +import com.fauna.event.FaunaEvent; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -25,7 +25,7 @@ import static com.fauna.client.FaunaConfig.FaunaEndpoint.LOCAL; import static com.fauna.query.builder.Query.fql; -import static com.fauna.event.StreamEvent.EventType.ERROR; +import static com.fauna.event.FaunaEvent.EventType.ERROR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -48,10 +48,10 @@ public static void setup() { public void feedOfAll() { FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build(); FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); - List>> pages = new ArrayList<>(); + List>> pages = new ArrayList<>(); iter.forEachRemaining(page -> pages.add(page.getEvents())); assertEquals(4, pages.size()); - List> products = pages.stream().flatMap(p -> p.stream()).collect(Collectors.toList()); + List> products = pages.stream().flatMap(p -> p.stream()).collect(Collectors.toList()); assertEquals(50, products.size()); } @@ -60,7 +60,7 @@ public void manualFeed() { // Use the feeds API with complete (i.e. manual) control of the calls made to Fauna. QuerySuccess sourceQuery = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class); EventSource source = EventSource.fromResponse(sourceQuery.getData()); - List> productUpdates = new ArrayList<>(); + List> productUpdates = new ArrayList<>(); FeedOptions initialOptions = FeedOptions.builder().startTs(productCollectionTs).pageSize(2).build(); CompletableFuture> pageFuture = client.poll(source, initialOptions, Product.class); int pageCount = 0; @@ -97,7 +97,7 @@ public void feedError() { FeedPage pageOne = iter.next(); assertFalse(pageOne.hasNext()); assertEquals(1, pageOne.getEvents().size()); - StreamEvent errorEvent = pageOne.getEvents().get(0); + FaunaEvent errorEvent = pageOne.getEvents().get(0); assertEquals(ERROR, errorEvent.getType()); assertEquals("invalid_stream_start_time", errorEvent.getError().getCode()); assertTrue(errorEvent.getError().getMessage().contains("is too far in the past")); @@ -107,10 +107,10 @@ public void feedError() { public void feedFlattened() { FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build(); FeedIterator iter = client.feed(fql("Product.all().eventSource()"), options, Product.class); - Iterator> productIter = iter.flatten(); - List> products = new ArrayList<>(); + Iterator> productIter = iter.flatten(); + List> products = new ArrayList<>(); // Java iterators not being iterable (or useable in a for-each loop) is annoying. - for (StreamEvent p : (Iterable>) () -> productIter) { + for (FaunaEvent p : (Iterable>) () -> productIter) { products.add(p); } assertEquals(50, products.size()); diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 3f556994..1a99b6ec 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -9,7 +9,7 @@ import com.fauna.event.EventSourceResponse; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; -import com.fauna.event.StreamEvent; +import com.fauna.event.FaunaEvent; import com.fauna.event.StreamRequest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -59,7 +59,7 @@ public static Query createProduct() { } - static class InventorySubscriber implements Flow.Subscriber> { + static class InventorySubscriber implements Flow.Subscriber> { private final AtomicLong timestamp = new AtomicLong(0); private String cursor = null; private final AtomicInteger events = new AtomicInteger(0); @@ -73,7 +73,7 @@ public void onSubscribe(Flow.Subscription subscription) { } @Override - public void onNext(StreamEvent event) { + public void onNext(FaunaEvent event) { System.out.println(MessageFormat.format("Event {0}, {1}", event.getCursor(), event.getTimestamp().orElse(-1L))); events.addAndGet(1); event.getData().ifPresent(product -> inventory.put(product.getName(), product.getQuantity()));