Skip to content

Commit

Permalink
Add README draft
Browse files Browse the repository at this point in the history
  • Loading branch information
jrodewig committed Oct 25, 2024
1 parent a591fd4 commit a5e841c
Showing 1 changed file with 194 additions and 0 deletions.
194 changes: 194 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,200 @@ QueryOptions options = QueryOptions.builder()
QuerySuccess result = client.query(query, String.class, options);
```

## Event Feeds (beta)

The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds).

### Request an Event Feed

An Event Feed asynchronously polls an [event
source](https://docs.fauna.com/fauna/current/learn/cdc/#create-an-event-source)
for paginated events.

To get an event source, append
[`eventSource()`](https://docs.fauna.com/fauna/current/reference/fql-api/schema-entities/set/eventsource/)
or
[`eventsOn()`](https://docs.fauna.com/fauna/current/reference/fql-api/schema-entities/set/eventson/)
to a [supported Set](https://docs.fauna.com/fauna/current/reference/cdc/#sets).

To get paginated events, pass the event source query or a `FeedRequest` to
`feed()` or `asyncFeed()`.

`feed()` returns an iterator, `FeedIterator<E>`, that emits pages of events.
Similarly, `asyncFeed()` returns `CompletableFuture<FeedIterator<E>>`.

You can use a `forEachRemaining()` loop to process each page of events.
Alternatively, you can iterate through individual events instead of pages
using `flatten()`.

```java
import com.fauna.client.Fauna;
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaConfig;
import com.fauna.client.FeedIterator;
import com.fauna.response.StreamEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class FeedExample {

public static void main(String[] args) {
FaunaConfig config = new FaunaConfig.Builder()
.secret("FAUNA_SECRET")
.build();
FaunaClient client = Fauna.client(config);

// Use `feed()` to create an event feed with a blocking request.
FeedIterator<Product> syncIterator = client.feed(
fql("Product.all().eventsOn(.price, .stock)"),
Product.class // Class for the type returned in events
);

// Gather event pages
List<List<StreamEvent<Product>>> pages = new ArrayList<>();
syncIterator.forEachRemaining(page -> pages.add(page.getEvents()));

// Flatten pages to a single list of events.
List<StreamEvent<Product>> syncEvents = pages.stream()
.flatMap(List::stream)
.collect(Collectors.toList());

System.out.println("Received " + syncEvents.size() + " events from feed().");

// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> eventSourceResponse = client.query(query, StreamTokenResponse.class);
String eventSource = eventSourceResponse.getData().getToken();

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.pageSize(10)
.build();

// Use `asyncFeed()` to get an event feed with a non-blocking request.
CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
Product.class // Class for the type returned in events
);

futureFeed.thenAccept(iterator -> {
// Flatten pages to a single list of events.
List<StreamEvent<Product>> events = iterator.flatten().collect(Collectors.toList());
System.out.println("Received " + events.size() + " events from asyncFeed.");
}).exceptionally(ex -> {
System.err.println("Error initializing async feed: " + ex.getMessage());
return null;
});
}
}
```

If changes occur between the creation of the event source and the request, the
feed replays and emits any related events. In most cases, you’ll get events
after a specific start time or event cursor.

### Get events after a specific start time

When you first poll an event source using an Event Feed, you usually pass
`startTs` argument to `feed()` or `asyncFeed()`.

`startTs` is an integer representing a time in microseconds since the Unix
epoch. The request returns events that occurred after the specified timestamp
(exclusive).

```java
// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> eventSourceResponse = client.query(query, StreamTokenResponse.class);
String eventSource = eventSourceResponse.getData().getToken();

// Calculate the timestamp for 10 minutes ago in microseconds.
long tenMinutesAgo = System.currentTimeMillis() * 1000 - (10 * 60 * 1000 * 1000);

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.startTs(tenMinutesAgo)
.pageSize(10)
.build();

FeedIterator<Product> syncIterator = client.feed(
feedRequest,
Product.class
);

CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
Product.class
);
```

### Get events after a specific event cursor

After the initial request, you usually get subsequent events using the cursor
for the last page or event. To get events after a cursor (exclusive), include
the cursor in a `FeedRequest` passed to `feed()` or `asyncFeed()`:

```java
// Get an event source.
Query query = fql("Product.all().eventSource() { name, stock }");
QuerySuccess<StreamTokenResponse> tokenResponse = client.query(query, StreamTokenResponse.class);
String eventSource = tokenResponse.getData().getToken();

// Create a FeedRequest.
FeedRequest feedRequest = FeedRequest.builder(eventSource)
.cursor("gsGabc456")
.pageSize(10)
.build();

FeedIterator<Product> syncIterator = client.feed(
feedRequest,
Product.class
);

CompletableFuture<FeedIterator<Product>> futureFeed = client.asyncFeed(
feedRequest,
Product.class
);
```

### Error handling

Exceptions can be raised in two different places:

* While fetching a page
* While iterating a page's events

This distinction lets ignore errors originating from event processing.
For example:

```java
try {
FeedIterator<Product> iterator = client.feed(
fql("Product.all().map(.details.toUpperCase()).eventSource()"),
Product.class
);

iterator.forEachRemaining(page -> {
try {
for (StreamEvent<Product> event : page.getEvents()) {
// Event-specific handling.
System.out.println("Event: " + event);
}
} catch (FaunaException e) {
// Handle errors for specific events within the page.
System.err.println("Error processing event: " + e.getMessage());
}
});

} catch (FaunaException e) {
// Additional handling for initialization errors.
System.err.println("Error occurred with event feed initialization: " + e.getMessage());
}
```

## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).
Expand Down

0 comments on commit a5e841c

Please sign in to comment.