Skip to content

Commit

Permalink
init feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy committed Nov 5, 2024
1 parent ec10eb2 commit 3427083
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 22 deletions.
113 changes: 99 additions & 14 deletions Fauna.Test/Integration.Tests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Text;
using Fauna.Core;
using Fauna.Exceptions;
using Fauna.Mapping;
using Fauna.Types;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void SetUp()
}

[SetUp]
[Category("Streaming")]
[Category("EventStream"), Category("EventFeed")]
public async Task SetUpStreaming()
{
await Fixtures.StreamingSandboxSetup(_client);
Expand Down Expand Up @@ -319,9 +320,9 @@ public async Task NullableStatsCollector()
Assert.Null(testClient.StatsCollector);
}

#region EventStreams

[Test]
[Category("Streaming")]
[Test, Category("EventStream")]
public async Task StreamRequestCancel()
{
var cts = new CancellationTokenSource();
Expand All @@ -345,8 +346,7 @@ await _client.EventStreamAsync<StreamingSandbox>(FQL($"StreamingSandbox.all().ev
Assert.ThrowsAsync<TaskCanceledException>(async () => await longRunningTask);
}

[Test]
[Category("Streaming")]
[Test, Category("EventStream")]
public async Task CanReadEventsFomStream()
{
var queries = new[]
Expand Down Expand Up @@ -420,8 +420,7 @@ public async Task CanReadEventsFomStream()
await Task.CompletedTask;
}

[Test]
[Category("Streaming")]
[Test, Category("EventStream")]
public Task StreamThrowsWithBadRequest()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test
Expand All @@ -443,8 +442,7 @@ public Task StreamThrowsWithBadRequest()
return Task.CompletedTask;
}

[Test]
[Category("Streaming")]
[Test, Category("EventStream")]
public async Task CanResumeStreamWithStreamOptions()
{
string? token = null;
Expand Down Expand Up @@ -524,23 +522,110 @@ public async Task CanResumeStreamWithStreamOptions()
Assert.Zero(expectedEvents, "stream handler should process all events");
}

[Test]
[Category("Streaming")]
[Test, Category("EventStream")]
public async Task CanOpenStreamWithEventSource()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test
cts.Token.ThrowIfCancellationRequested();

EventSource eventSource = await _client.GetEventSourceFromQueryAsync(
EventSource eventSource = _client.QueryAsync<EventSource>(
FQL($"StreamingSandbox.all().eventSource()"),
queryOptions: null,
cancellationToken: cts.Token
);
cancel: cts.Token
).Result.Data;

var stream = await _client.EventStreamAsync<StreamingSandbox>(eventSource, cts.Token);
Assert.IsNotNull(stream);
}

#endregion

#region EventFeeds

[Test, Category("EventFeed")]
public async Task CanOpenFeedWithQuery()
{
var feed = await _client.EventFeedAsync<StreamingSandbox>(FQL($"StreamingSandbox.all().eventSource()"));
Assert.IsNotEmpty(feed.Cursor, "should have a cursor");
Assert.IsNull(feed.CurrentPage, "should not have loaded a page");

await feed.NextAsync();

Assert.NotNull(feed.CurrentPage, "should have loaded a page");
Assert.IsNotEmpty(feed.Cursor, "should have a cursor");
Assert.IsEmpty(feed.CurrentPage!.Events, "should note have events");

await _client.QueryAsync(FQL($"StreamingSandbox.create({{ foo: 'bar' }})"));

FeedPage<StreamingSandbox>? lastPage = null;
await foreach (var page in feed)
{
Assert.IsNotEmpty(page.Cursor, "should have a cursor");
Assert.NotZero(page.Stats.ReadOps, "should have read ops");
Assert.AreEqual(1, page.Events.Count, "should have 1 event");
Assert.AreEqual(EventType.Add, page.Events[0].Type, "should be an add event");
lastPage = page;
}

// Get another page, should be empty
await feed.NextAsync();

Assert.IsEmpty(feed.CurrentPage!.Events, "should not have any events");
if (lastPage != null)
{
Assert.AreNotEqual(feed.Cursor, lastPage.Cursor, "should have a different cursor");
}
}

[Test, Category("EventFeed")]
public async Task CanOpenFeedWithEventSource()
{
EventSource eventSource = _client.QueryAsync<EventSource>(FQL($"StreamingSandbox.all().eventSource()")).Result.Data;
Assert.NotNull(eventSource);

var feed = await _client.EventFeedAsync<StreamingSandbox>(eventSource);
Assert.IsNotNull(feed);

await feed.NextAsync();

Assert.IsNotEmpty(feed.Cursor, "should have a cursor");
Assert.IsEmpty(feed.CurrentPage!.Events, "should not have any events");
}

[Test, Category("EventFeed")]
public async Task CanUseFeedOptionsPageSize()
{
EventSource eventSource =
_client.QueryAsync<EventSource>(FQL($"StreamingSandbox.all().eventSource()")).Result.Data;
Assert.NotNull(eventSource);

const int pageSize = 3;
const int start = 5;
const int end = 20;

// Create Events
await _client.QueryAsync(
FQL($"Set.sequence({start}, {end}).forEach(n => StreamingSandbox.create({{ n: n }}))"));

var feed = await _client.EventFeedAsync<StreamingSandbox>(eventSource, new FeedOptions(pageSize: pageSize));
Assert.IsNotNull(feed);

int pages = 0;
await foreach (var page in feed)
{
if (page.HasNext)
{
Assert.AreEqual(pageSize, page.Events.Count);
}

pages++;
}

Assert.AreEqual((end - start) / pageSize, pages, "should have the correct number of pages");
}

#endregion

[Test]
public async Task CollectionAll()
{
Expand Down
34 changes: 34 additions & 0 deletions Fauna/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Client : BaseClient, IDisposable
{
private const string QueryUriPath = "/query/1";
private const string StreamUriPath = "/stream/1";
private const string FeedUriPath = "/feed/1";

private readonly Configuration _config;
private readonly IConnection _connection;
Expand Down Expand Up @@ -165,6 +166,39 @@ internal override async IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
yield return evt;
}
}
internal override async IAsyncEnumerator<FeedPage<T>> SubscribeFeedInternal<T>(
Types.EventSource eventSource,
MappingContext ctx,
CancellationToken cancel = default)
{
cancel.ThrowIfCancellationRequested();

var finalOptions = QueryOptions.GetFinalQueryOptions(_config.DefaultQueryOptions, null);
var headers = GetRequestHeaders(finalOptions);

while (!cancel.IsCancellationRequested)
{
var feedData = new MemoryStream();
eventSource.Serialize(feedData);

using var httpResponse = await _connection.DoPostAsync(
FeedUriPath,
feedData,
headers,
GetRequestTimeoutWithBuffer(finalOptions.QueryTimeout),
cancel);
string body = await httpResponse.Content.ReadAsStringAsync(cancel);

var res = FeedPage<T>.From(body, ctx);
eventSource.LastCursor = res.Cursor;
StatsCollector?.Add(res.Stats);
yield return res;
if (!res.HasNext)
{
break;
}
}
}

private void Serialize(Stream stream, Query query, MappingContext ctx)
{
Expand Down
163 changes: 163 additions & 0 deletions Fauna/Core/FeedEnumberable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
using System.Collections;
using System.Text.Json;
using Fauna.Exceptions;
using Fauna.Mapping;
using Fauna.Serialization;
using Fauna.Types;
using static Fauna.Core.ResponseFields;

namespace Fauna.Core;

/// <summary>
///
/// </summary>
/// <typeparam name="T"></typeparam>
public class FeedPage<T> where T : notnull
{
/// <summary>
///
/// </summary>
public List<Event<T>> Events { get; private init; } = new();

/// <summary>
///
/// </summary>
public string Cursor { get; private init; } = null!;

/// <summary>
///
/// </summary>
public bool HasNext { get; private init; }

/// <summary>
///
/// </summary>
public QueryStats Stats { get; private init; }

internal static FeedPage<T> From(string body, MappingContext ctx)
{
var json = JsonSerializer.Deserialize<JsonElement>(body);

var err = GetError(json);
if (err != null)
{
throw new FaunaException(err.Value);
}

return new FeedPage<T>
{
Cursor = GetCursor(json),
Events = GetEvents(json, ctx),
Stats = GetStats(json),
HasNext = json.TryGetProperty(HasNextFieldName, out var elem) && elem.GetBoolean()
};
}

private static List<Event<T>> GetEvents(JsonElement json, MappingContext ctx)
{
if (!json.TryGetProperty(EventsFieldName, out var elem))
{
return new List<Event<T>>();
}

var events = elem.EnumerateArray().Select(e => Event<T>.From(e, ctx)).ToList();
return events;
}

private static QueryStats GetStats(JsonElement json)
{
return json.TryGetProperty(StatsFieldName, out var elem) ? elem.Deserialize<QueryStats>() : default;
}

private static string GetCursor(JsonElement json)
{
return json.TryGetProperty(CursorFieldName, out var elem) ? elem.GetString()! : null!;
}

private static ErrorInfo? GetError(JsonElement json)
{
return json.TryGetProperty(ErrorFieldName, out var elem) ? elem.Deserialize<ErrorInfo>() : null;
}
}

/// <summary>
/// Represents a Fauna Event Feed.
/// </summary>
/// <typeparam name="T">Type to map each of the Events to.</typeparam>
public class FeedEnumerable<T> where T : notnull
{
private readonly BaseClient _client;
private readonly EventSource _eventSource;
private readonly CancellationToken _cancel;
private readonly FeedOptions? _feedOptions;

/// <summary>
/// The current cursor for the Feed.
/// </summary>
public string? Cursor => _eventSource.LastCursor;

/// <summary>
/// The last page returned from the Event Feed enumerator.
/// </summary>
public FeedPage<T>? CurrentPage { get; private set; }

internal FeedEnumerable(
BaseClient client,
EventSource eventSource,
FeedOptions? feedOptions = null,
CancellationToken cancel = default)
{
_client = client;
_eventSource = eventSource;
_cancel = cancel;
_feedOptions = feedOptions;

if (feedOptions?.Cursor is not null)
{
_eventSource.LastCursor = feedOptions.Cursor;
}

if (feedOptions?.PageSize is > 0)
{
_eventSource.PageSize = feedOptions.PageSize;
}
}

/// <summary>
/// Move to the next page of the Event Feed.
/// </summary>
/// <returns></returns>
public async Task<bool> NextAsync()
{
await using var subscribeFeed = _client.SubscribeFeed<T>(
_eventSource,
_client.MappingCtx,
_cancel);

bool result = await subscribeFeed.MoveNextAsync();
if (result)
{
CurrentPage = subscribeFeed.Current;
}

return result;
}

/// <summary>
/// Returns an enumerator that iterates through the Feed.
/// </summary>
/// <returns>Event Page Enumerator</returns>
public async IAsyncEnumerator<FeedPage<T>> GetAsyncEnumerator()
{
await using var subscribeFeed = _client.SubscribeFeed<T>(
_eventSource,
_client.MappingCtx,
_cancel);

while (!_cancel.IsCancellationRequested && await subscribeFeed.MoveNextAsync())
{
CurrentPage = subscribeFeed.Current;
yield return CurrentPage;
}
}
}
Loading

0 comments on commit 3427083

Please sign in to comment.