Skip to content

Commit

Permalink
Rename StreamEvent -> FaunaEvent as it's now used for Streams and Feeds.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Griffin committed Oct 31, 2024
1 parent 1dd3442 commit 616e32c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static com.fauna.constants.ResponseFields.CURSOR_FIELD_NAME;
import static com.fauna.constants.ResponseFields.STREAM_TYPE_FIELD_NAME;

public class StreamEvent<E> {
public class FaunaEvent<E> {
public enum EventType {
STATUS, ADD, UPDATE, REMOVE, ERROR
}
Expand All @@ -34,7 +34,7 @@ public enum EventType {
private final ErrorInfo error;


public StreamEvent(EventType type, String cursor, Long txn_ts, E data, QueryStats stats, ErrorInfo error) {
public FaunaEvent(EventType type, String cursor, Long txn_ts, E data, QueryStats stats, ErrorInfo error) {
this.type = type;
this.cursor = cursor;
this.txn_ts = txn_ts;
Expand All @@ -46,7 +46,7 @@ public StreamEvent(EventType type, String cursor, Long txn_ts, E data, QueryStat
public static class Builder<E> {
private final Codec<E> dataCodec;
String cursor = null;
StreamEvent.EventType eventType = null;
FaunaEvent.EventType eventType = null;
QueryStats stats = null;
E data = null;
Long txn_ts = null;
Expand All @@ -61,7 +61,7 @@ public Builder<E> cursor(String cursor) {
return this;
}

public Builder<E> eventType(StreamEvent.EventType eventType) {
public Builder<E> eventType(FaunaEvent.EventType eventType) {
this.eventType = eventType;
return this;
}
Expand Down Expand Up @@ -93,8 +93,8 @@ public Builder<E> error(ErrorInfo error) {
return this;
}

public StreamEvent<E> build() {
return new StreamEvent<>(eventType, cursor, txn_ts, data, stats, errorInfo);
public FaunaEvent<E> build() {
return new FaunaEvent<>(eventType, cursor, txn_ts, data, stats, errorInfo);
}

}
Expand Down Expand Up @@ -124,11 +124,11 @@ static <E> Builder<E> parseField(Builder<E> builder, JsonParser parser) throws I

}

private static StreamEvent.EventType parseEventType(JsonParser parser) throws IOException {
private static FaunaEvent.EventType parseEventType(JsonParser parser) throws IOException {
if (parser.nextToken() == VALUE_STRING) {
String typeString = parser.getText().toUpperCase();
try {
return StreamEvent.EventType.valueOf(typeString);
return FaunaEvent.EventType.valueOf(typeString);
} catch (IllegalArgumentException e) {
throw new ClientResponseException("Invalid event type: " + typeString, e);
}
Expand All @@ -137,9 +137,9 @@ private static StreamEvent.EventType parseEventType(JsonParser parser) throws IO
}
}

public static <E> StreamEvent<E> parse(JsonParser parser, Codec<E> dataCodec) throws IOException {
public static <E> FaunaEvent<E> parse(JsonParser parser, Codec<E> dataCodec) throws IOException {
if (parser.currentToken() == START_OBJECT || parser.nextToken() == START_OBJECT) {
Builder<E> builder = StreamEvent.builder(dataCodec);
Builder<E> builder = FaunaEvent.builder(dataCodec);
while (parser.nextToken() == FIELD_NAME) {
builder = parseField(builder, parser);
}
Expand All @@ -149,7 +149,7 @@ public static <E> StreamEvent<E> parse(JsonParser parser, Codec<E> dataCodec) th
}
}

public StreamEvent.EventType getType() {
public FaunaEvent.EventType getType() {
return type;
}

Expand Down
11 changes: 5 additions & 6 deletions src/main/java/com/fauna/event/FaunaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;


public class FaunaStream<E> extends SubmissionPublisher<StreamEvent<E>> implements Processor<List<ByteBuffer>, StreamEvent<E>> {
public class FaunaStream<E> extends SubmissionPublisher<FaunaEvent<E>> implements Processor<List<ByteBuffer>, FaunaEvent<E>> {
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final Codec<E> dataCodec;
private Subscription subscription;
private Subscriber<? super StreamEvent<E>> eventSubscriber;
private Subscriber<? super FaunaEvent<E>> eventSubscriber;
private MultiByteBufferInputStream buffer = null;
private final StatsCollector statsCollector;

Expand All @@ -33,7 +32,7 @@ public FaunaStream(Class<E> elementClass, StatsCollector statsCollector) {
}

@Override
public void subscribe(Subscriber<? super StreamEvent<E>> subscriber) {
public void subscribe(Subscriber<? super FaunaEvent<E>> subscriber) {
if (this.eventSubscriber == null) {
this.eventSubscriber = subscriber;
super.subscribe(subscriber);
Expand Down Expand Up @@ -61,11 +60,11 @@ public void onNext(List<ByteBuffer> buffers) {
}
try {
JsonParser parser = JSON_FACTORY.createParser(buffer);
StreamEvent<E> event = StreamEvent.parse(parser, dataCodec);
FaunaEvent<E> event = FaunaEvent.parse(parser, dataCodec);

statsCollector.add(event.getStats());

if (event.getType() == StreamEvent.EventType.ERROR) {
if (event.getType() == FaunaEvent.EventType.ERROR) {
ErrorInfo error = event.getError();
this.onComplete();
this.close();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/fauna/event/FeedIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ public FeedPage<E> next() {
* Return an iterator that iterates directly over the items that make up the page contents.
* @return An iterator of E.
*/
public Iterator<StreamEvent<E>> flatten() {
public Iterator<FaunaEvent<E>> flatten() {
return new Iterator<>() {
private final FeedIterator<E> feedIterator = FeedIterator.this;
private Iterator<StreamEvent<E>> thisPage = feedIterator.hasNext() ? feedIterator.next().getEvents().iterator() : null;
private Iterator<FaunaEvent<E>> thisPage = feedIterator.hasNext() ? feedIterator.next().getEvents().iterator() : null;
@Override
public boolean hasNext() {
return thisPage != null && (thisPage.hasNext() || feedIterator.hasNext());
}

@Override
public StreamEvent<E> next() {
public FaunaEvent<E> next() {
if (thisPage == null) {
throw new NoSuchElementException();
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/fauna/event/FeedPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import static com.fauna.constants.ResponseFields.CURSOR_FIELD_NAME;

public class FeedPage<E> {
private final List<StreamEvent<E>> events;
private final List<FaunaEvent<E>> events;
private final String cursor;
private final boolean hasNext;
private final QueryStats stats;
private static final JsonFactory JSON_FACTORY = new JsonFactory();

public FeedPage(final List<StreamEvent<E>> events,
public FeedPage(final List<FaunaEvent<E>> events,
final String cursor,
final boolean hasNext,
final QueryStats stats) {
Expand All @@ -44,7 +44,7 @@ public FeedPage(final List<StreamEvent<E>> events,
}
}

public List<StreamEvent<E>> getEvents() {
public List<FaunaEvent<E>> getEvents() {
return events;
}

Expand All @@ -62,7 +62,7 @@ public QueryStats getStats() {

public static class Builder<E> {
private final Codec<E> elementCodec;
public List<StreamEvent<E>> events;
public List<FaunaEvent<E>> events;
public String cursor = "";
public Boolean hasNext = false;
public QueryStats stats = null;
Expand All @@ -71,7 +71,7 @@ public Builder(Codec<E> elementCodec) {
this.elementCodec = elementCodec;
}

public Builder events(List<StreamEvent<E>> events) {
public Builder events(List<FaunaEvent<E>> events) {
this.events = events;
return this;
}
Expand All @@ -93,9 +93,9 @@ public Builder stats(QueryStats stats) {

public Builder<E> parseEvents(JsonParser parser) throws IOException {
if (parser.nextToken() == START_ARRAY) {
List<StreamEvent<E>> events = new ArrayList<>();
List<FaunaEvent<E>> events = new ArrayList<>();
while (parser.nextToken() != END_ARRAY) {
events.add(StreamEvent.parse(parser, elementCodec));
events.add(FaunaEvent.parse(parser, elementCodec));
}
this.events = events;
} else {
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/com/fauna/client/FeedIteratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.fauna.response.ErrorInfo;
import com.fauna.response.QueryFailure;
import com.fauna.response.QueryResponse;
import com.fauna.event.StreamEvent;
import com.fauna.event.FaunaEvent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -46,12 +46,12 @@ public class FeedIteratorTest {
private FaunaClient client;

private CompletableFuture<FeedPage<String>> successFuture(boolean after, int num) throws IOException {
List<StreamEvent<String>> events = new ArrayList<>();
List<FaunaEvent<String>> events = new ArrayList<>();
Codec<String> codec = DefaultCodecProvider.SINGLETON.get(String.class);
events.add(new StreamEvent<>(StreamEvent.EventType.ADD,
events.add(new FaunaEvent<>(FaunaEvent.EventType.ADD,
"cursor0", System.currentTimeMillis() - 10,
num + "-a", null, null));
events.add(new StreamEvent<>(StreamEvent.EventType.ADD,
events.add(new FaunaEvent<>(FaunaEvent.EventType.ADD,
"cursor0", System.currentTimeMillis() - 5,
num + "-b", null, null));

Expand Down Expand Up @@ -124,7 +124,7 @@ public void test_flatten() throws IOException {
when(client.poll(argThat(source::equals), argThat(FeedOptions.DEFAULT::equals), any(Class.class))).thenReturn(successFuture(true, 0));
when(client.poll(argThat(source::equals), argThat(opts -> opts.getCursor().orElse("").equals(CURSOR_0)), any(Class.class))).thenReturn(successFuture(false, 1));
FeedIterator<String> feedIterator = new FeedIterator<>(client, source, FeedOptions.DEFAULT, String.class);
Iterator<StreamEvent<String>> iter = feedIterator.flatten();
Iterator<FaunaEvent<String>> iter = feedIterator.flatten();
List<String> products = new ArrayList<>();
iter.forEachRemaining(event -> products.add(event.getData().orElseThrow()));
assertEquals(4, products.size());
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/com/fauna/e2e/E2EFeedsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.fauna.event.FeedPage;
import com.fauna.event.EventSourceResponse;
import com.fauna.response.QuerySuccess;
import com.fauna.event.StreamEvent;
import com.fauna.event.FaunaEvent;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand All @@ -25,7 +25,7 @@

import static com.fauna.client.FaunaConfig.FaunaEndpoint.LOCAL;
import static com.fauna.query.builder.Query.fql;
import static com.fauna.event.StreamEvent.EventType.ERROR;
import static com.fauna.event.FaunaEvent.EventType.ERROR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -48,10 +48,10 @@ public static void setup() {
public void feedOfAll() {
FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build();
FeedIterator<Product> iter = client.feed(fql("Product.all().eventSource()"), options, Product.class);
List<List<StreamEvent<Product>>> pages = new ArrayList<>();
List<List<FaunaEvent<Product>>> pages = new ArrayList<>();
iter.forEachRemaining(page -> pages.add(page.getEvents()));
assertEquals(4, pages.size());
List<StreamEvent<Product>> products = pages.stream().flatMap(p -> p.stream()).collect(Collectors.toList());
List<FaunaEvent<Product>> products = pages.stream().flatMap(p -> p.stream()).collect(Collectors.toList());
assertEquals(50, products.size());
}

Expand All @@ -60,7 +60,7 @@ public void manualFeed() {
// Use the feeds API with complete (i.e. manual) control of the calls made to Fauna.
QuerySuccess<EventSourceResponse> sourceQuery = client.query(fql("Product.all().eventSource()"), EventSourceResponse.class);
EventSource source = EventSource.fromResponse(sourceQuery.getData());
List<StreamEvent<Product>> productUpdates = new ArrayList<>();
List<FaunaEvent<Product>> productUpdates = new ArrayList<>();
FeedOptions initialOptions = FeedOptions.builder().startTs(productCollectionTs).pageSize(2).build();
CompletableFuture<FeedPage<Product>> pageFuture = client.poll(source, initialOptions, Product.class);
int pageCount = 0;
Expand Down Expand Up @@ -97,7 +97,7 @@ public void feedError() {
FeedPage<Product> pageOne = iter.next();
assertFalse(pageOne.hasNext());
assertEquals(1, pageOne.getEvents().size());
StreamEvent<Product> errorEvent = pageOne.getEvents().get(0);
FaunaEvent<Product> errorEvent = pageOne.getEvents().get(0);
assertEquals(ERROR, errorEvent.getType());
assertEquals("invalid_stream_start_time", errorEvent.getError().getCode());
assertTrue(errorEvent.getError().getMessage().contains("is too far in the past"));
Expand All @@ -107,10 +107,10 @@ public void feedError() {
public void feedFlattened() {
FeedOptions options = FeedOptions.builder().startTs(productCollectionTs).build();
FeedIterator<Product> iter = client.feed(fql("Product.all().eventSource()"), options, Product.class);
Iterator<StreamEvent<Product>> productIter = iter.flatten();
List<StreamEvent<Product>> products = new ArrayList<>();
Iterator<FaunaEvent<Product>> productIter = iter.flatten();
List<FaunaEvent<Product>> products = new ArrayList<>();
// Java iterators not being iterable (or useable in a for-each loop) is annoying.
for (StreamEvent<Product> p : (Iterable<StreamEvent<Product>>) () -> productIter) {
for (FaunaEvent<Product> p : (Iterable<FaunaEvent<Product>>) () -> productIter) {
products.add(p);
}
assertEquals(50, products.size());
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/com/fauna/e2e/E2EStreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.fauna.event.EventSourceResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
import com.fauna.event.StreamEvent;
import com.fauna.event.FaunaEvent;
import com.fauna.event.StreamRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -59,7 +59,7 @@ public static Query createProduct() {
}


static class InventorySubscriber implements Flow.Subscriber<StreamEvent<Product>> {
static class InventorySubscriber implements Flow.Subscriber<FaunaEvent<Product>> {
private final AtomicLong timestamp = new AtomicLong(0);
private String cursor = null;
private final AtomicInteger events = new AtomicInteger(0);
Expand All @@ -73,7 +73,7 @@ public void onSubscribe(Flow.Subscription subscription) {
}

@Override
public void onNext(StreamEvent<Product> event) {
public void onNext(FaunaEvent<Product> event) {
System.out.println(MessageFormat.format("Event {0}, {1}", event.getCursor(), event.getTimestamp().orElse(-1L)));
events.addAndGet(1);
event.getData().ifPresent(product -> inventory.put(product.getName(), product.getQuantity()));
Expand Down

0 comments on commit 616e32c

Please sign in to comment.