From 342708359b3f8cd7cb5e1edd9fa4bfa7e8bf8ce7 Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 4 Nov 2024 11:51:37 -0500 Subject: [PATCH] init feeds --- Fauna.Test/Integration.Tests.cs | 113 +++++++++++++++++++--- Fauna/Client.cs | 34 +++++++ Fauna/Core/FeedEnumberable.cs | 163 ++++++++++++++++++++++++++++++++ Fauna/Core/FeedOptions.cs | 56 +++++++++++ Fauna/Core/ResponseFields.cs | 10 ++ Fauna/IClient.cs | 80 +++++++++++++++- Fauna/Linq/DataContext.cs | 10 ++ Fauna/Types/Event.cs | 23 ++++- Fauna/Types/EventSource.cs | 11 +++ 9 files changed, 478 insertions(+), 22 deletions(-) create mode 100644 Fauna/Core/FeedEnumberable.cs create mode 100644 Fauna/Core/FeedOptions.cs diff --git a/Fauna.Test/Integration.Tests.cs b/Fauna.Test/Integration.Tests.cs index 247cefcf..80138fd0 100644 --- a/Fauna.Test/Integration.Tests.cs +++ b/Fauna.Test/Integration.Tests.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using System.Text; +using Fauna.Core; using Fauna.Exceptions; using Fauna.Mapping; using Fauna.Types; @@ -39,7 +40,7 @@ public void SetUp() } [SetUp] - [Category("Streaming")] + [Category("EventStream"), Category("EventFeed")] public async Task SetUpStreaming() { await Fixtures.StreamingSandboxSetup(_client); @@ -319,9 +320,9 @@ public async Task NullableStatsCollector() Assert.Null(testClient.StatsCollector); } + #region EventStreams - [Test] - [Category("Streaming")] + [Test, Category("EventStream")] public async Task StreamRequestCancel() { var cts = new CancellationTokenSource(); @@ -345,8 +346,7 @@ await _client.EventStreamAsync(FQL($"StreamingSandbox.all().ev Assert.ThrowsAsync(async () => await longRunningTask); } - [Test] - [Category("Streaming")] + [Test, Category("EventStream")] public async Task CanReadEventsFomStream() { var queries = new[] @@ -420,8 +420,7 @@ public async Task CanReadEventsFomStream() await Task.CompletedTask; } - [Test] - [Category("Streaming")] + [Test, Category("EventStream")] public Task StreamThrowsWithBadRequest() { var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test @@ -443,8 +442,7 @@ public Task StreamThrowsWithBadRequest() return Task.CompletedTask; } - [Test] - [Category("Streaming")] + [Test, Category("EventStream")] public async Task CanResumeStreamWithStreamOptions() { string? token = null; @@ -524,23 +522,110 @@ public async Task CanResumeStreamWithStreamOptions() Assert.Zero(expectedEvents, "stream handler should process all events"); } - [Test] - [Category("Streaming")] + [Test, Category("EventStream")] public async Task CanOpenStreamWithEventSource() { var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test cts.Token.ThrowIfCancellationRequested(); - EventSource eventSource = await _client.GetEventSourceFromQueryAsync( + EventSource eventSource = _client.QueryAsync( FQL($"StreamingSandbox.all().eventSource()"), queryOptions: null, - cancellationToken: cts.Token - ); + cancel: cts.Token + ).Result.Data; var stream = await _client.EventStreamAsync(eventSource, cts.Token); Assert.IsNotNull(stream); } + #endregion + + #region EventFeeds + + [Test, Category("EventFeed")] + public async Task CanOpenFeedWithQuery() + { + var feed = await _client.EventFeedAsync(FQL($"StreamingSandbox.all().eventSource()")); + Assert.IsNotEmpty(feed.Cursor, "should have a cursor"); + Assert.IsNull(feed.CurrentPage, "should not have loaded a page"); + + await feed.NextAsync(); + + Assert.NotNull(feed.CurrentPage, "should have loaded a page"); + Assert.IsNotEmpty(feed.Cursor, "should have a cursor"); + Assert.IsEmpty(feed.CurrentPage!.Events, "should note have events"); + + await _client.QueryAsync(FQL($"StreamingSandbox.create({{ foo: 'bar' }})")); + + FeedPage? lastPage = null; + await foreach (var page in feed) + { + Assert.IsNotEmpty(page.Cursor, "should have a cursor"); + Assert.NotZero(page.Stats.ReadOps, "should have read ops"); + Assert.AreEqual(1, page.Events.Count, "should have 1 event"); + Assert.AreEqual(EventType.Add, page.Events[0].Type, "should be an add event"); + lastPage = page; + } + + // Get another page, should be empty + await feed.NextAsync(); + + Assert.IsEmpty(feed.CurrentPage!.Events, "should not have any events"); + if (lastPage != null) + { + Assert.AreNotEqual(feed.Cursor, lastPage.Cursor, "should have a different cursor"); + } + } + + [Test, Category("EventFeed")] + public async Task CanOpenFeedWithEventSource() + { + EventSource eventSource = _client.QueryAsync(FQL($"StreamingSandbox.all().eventSource()")).Result.Data; + Assert.NotNull(eventSource); + + var feed = await _client.EventFeedAsync(eventSource); + Assert.IsNotNull(feed); + + await feed.NextAsync(); + + Assert.IsNotEmpty(feed.Cursor, "should have a cursor"); + Assert.IsEmpty(feed.CurrentPage!.Events, "should not have any events"); + } + + [Test, Category("EventFeed")] + public async Task CanUseFeedOptionsPageSize() + { + EventSource eventSource = + _client.QueryAsync(FQL($"StreamingSandbox.all().eventSource()")).Result.Data; + Assert.NotNull(eventSource); + + const int pageSize = 3; + const int start = 5; + const int end = 20; + + // Create Events + await _client.QueryAsync( + FQL($"Set.sequence({start}, {end}).forEach(n => StreamingSandbox.create({{ n: n }}))")); + + var feed = await _client.EventFeedAsync(eventSource, new FeedOptions(pageSize: pageSize)); + Assert.IsNotNull(feed); + + int pages = 0; + await foreach (var page in feed) + { + if (page.HasNext) + { + Assert.AreEqual(pageSize, page.Events.Count); + } + + pages++; + } + + Assert.AreEqual((end - start) / pageSize, pages, "should have the correct number of pages"); + } + + #endregion + [Test] public async Task CollectionAll() { diff --git a/Fauna/Client.cs b/Fauna/Client.cs index 9e09564d..b139019f 100644 --- a/Fauna/Client.cs +++ b/Fauna/Client.cs @@ -16,6 +16,7 @@ public class Client : BaseClient, IDisposable { private const string QueryUriPath = "/query/1"; private const string StreamUriPath = "/stream/1"; + private const string FeedUriPath = "/feed/1"; private readonly Configuration _config; private readonly IConnection _connection; @@ -165,6 +166,39 @@ internal override async IAsyncEnumerator> SubscribeStreamInternal( yield return evt; } } + internal override async IAsyncEnumerator> SubscribeFeedInternal( + Types.EventSource eventSource, + MappingContext ctx, + CancellationToken cancel = default) + { + cancel.ThrowIfCancellationRequested(); + + var finalOptions = QueryOptions.GetFinalQueryOptions(_config.DefaultQueryOptions, null); + var headers = GetRequestHeaders(finalOptions); + + while (!cancel.IsCancellationRequested) + { + var feedData = new MemoryStream(); + eventSource.Serialize(feedData); + + using var httpResponse = await _connection.DoPostAsync( + FeedUriPath, + feedData, + headers, + GetRequestTimeoutWithBuffer(finalOptions.QueryTimeout), + cancel); + string body = await httpResponse.Content.ReadAsStringAsync(cancel); + + var res = FeedPage.From(body, ctx); + eventSource.LastCursor = res.Cursor; + StatsCollector?.Add(res.Stats); + yield return res; + if (!res.HasNext) + { + break; + } + } + } private void Serialize(Stream stream, Query query, MappingContext ctx) { diff --git a/Fauna/Core/FeedEnumberable.cs b/Fauna/Core/FeedEnumberable.cs new file mode 100644 index 00000000..503b1770 --- /dev/null +++ b/Fauna/Core/FeedEnumberable.cs @@ -0,0 +1,163 @@ +using System.Collections; +using System.Text.Json; +using Fauna.Exceptions; +using Fauna.Mapping; +using Fauna.Serialization; +using Fauna.Types; +using static Fauna.Core.ResponseFields; + +namespace Fauna.Core; + +/// +/// +/// +/// +public class FeedPage where T : notnull +{ + /// + /// + /// + public List> Events { get; private init; } = new(); + + /// + /// + /// + public string Cursor { get; private init; } = null!; + + /// + /// + /// + public bool HasNext { get; private init; } + + /// + /// + /// + public QueryStats Stats { get; private init; } + + internal static FeedPage From(string body, MappingContext ctx) + { + var json = JsonSerializer.Deserialize(body); + + var err = GetError(json); + if (err != null) + { + throw new FaunaException(err.Value); + } + + return new FeedPage + { + Cursor = GetCursor(json), + Events = GetEvents(json, ctx), + Stats = GetStats(json), + HasNext = json.TryGetProperty(HasNextFieldName, out var elem) && elem.GetBoolean() + }; + } + + private static List> GetEvents(JsonElement json, MappingContext ctx) + { + if (!json.TryGetProperty(EventsFieldName, out var elem)) + { + return new List>(); + } + + var events = elem.EnumerateArray().Select(e => Event.From(e, ctx)).ToList(); + return events; + } + + private static QueryStats GetStats(JsonElement json) + { + return json.TryGetProperty(StatsFieldName, out var elem) ? elem.Deserialize() : default; + } + + private static string GetCursor(JsonElement json) + { + return json.TryGetProperty(CursorFieldName, out var elem) ? elem.GetString()! : null!; + } + + private static ErrorInfo? GetError(JsonElement json) + { + return json.TryGetProperty(ErrorFieldName, out var elem) ? elem.Deserialize() : null; + } +} + +/// +/// Represents a Fauna Event Feed. +/// +/// Type to map each of the Events to. +public class FeedEnumerable where T : notnull +{ + private readonly BaseClient _client; + private readonly EventSource _eventSource; + private readonly CancellationToken _cancel; + private readonly FeedOptions? _feedOptions; + + /// + /// The current cursor for the Feed. + /// + public string? Cursor => _eventSource.LastCursor; + + /// + /// The last page returned from the Event Feed enumerator. + /// + public FeedPage? CurrentPage { get; private set; } + + internal FeedEnumerable( + BaseClient client, + EventSource eventSource, + FeedOptions? feedOptions = null, + CancellationToken cancel = default) + { + _client = client; + _eventSource = eventSource; + _cancel = cancel; + _feedOptions = feedOptions; + + if (feedOptions?.Cursor is not null) + { + _eventSource.LastCursor = feedOptions.Cursor; + } + + if (feedOptions?.PageSize is > 0) + { + _eventSource.PageSize = feedOptions.PageSize; + } + } + + /// + /// Move to the next page of the Event Feed. + /// + /// + public async Task NextAsync() + { + await using var subscribeFeed = _client.SubscribeFeed( + _eventSource, + _client.MappingCtx, + _cancel); + + bool result = await subscribeFeed.MoveNextAsync(); + if (result) + { + CurrentPage = subscribeFeed.Current; + } + + return result; + } + + /// + /// Returns an enumerator that iterates through the Feed. + /// + /// Event Page Enumerator + public async IAsyncEnumerator> GetAsyncEnumerator() + { + await using var subscribeFeed = _client.SubscribeFeed( + _eventSource, + _client.MappingCtx, + _cancel); + + while (!_cancel.IsCancellationRequested && await subscribeFeed.MoveNextAsync()) + { + CurrentPage = subscribeFeed.Current; + yield return CurrentPage; + } + } +} diff --git a/Fauna/Core/FeedOptions.cs b/Fauna/Core/FeedOptions.cs new file mode 100644 index 00000000..c393353f --- /dev/null +++ b/Fauna/Core/FeedOptions.cs @@ -0,0 +1,56 @@ +namespace Fauna.Core; + +/// +/// Represents the options when subscribing to Fauna Event Feeds. +/// +public class FeedOptions +{ + /// + /// Initializes a new instance of the class with the specified cursor and optional page size. + /// + /// The cursor for the feed. Used to resume the Feed. + /// Optional page size for the feed. + /// + public FeedOptions(string cursor, int? pageSize) + { + Cursor = cursor; + PageSize = pageSize; + } + + /// + /// Initializes a new instance of the class with the specified start timestamp and optional page size. + /// + /// The start timestamp for the feed. Used to resume the Feed. + /// Optional page size for the feed. + /// + public FeedOptions(long startTs, int? pageSize) + { + StartTs = startTs; + PageSize = pageSize; + } + + /// + /// Initializes a new instance of the class with the specified page size. + /// + /// The page size for the feed. + public FeedOptions(int pageSize) + { + PageSize = pageSize; + } + + /// + /// Cursor returned from Fauna + /// + /// + public string? Cursor { get; } + + /// + /// Start timestamp returned for the feed. Used to resume the Feed. + /// + public long? StartTs { get; } + + /// + /// Limit page size for the Feed + /// + public int? PageSize { get; } +} diff --git a/Fauna/Core/ResponseFields.cs b/Fauna/Core/ResponseFields.cs index 8b55ac89..a99bf4fe 100644 --- a/Fauna/Core/ResponseFields.cs +++ b/Fauna/Core/ResponseFields.cs @@ -52,6 +52,16 @@ internal readonly struct ResponseFields /// public const string ErrorFieldName = "error"; + /// + /// Field name for pagination information. + /// + public const string HasNextFieldName = "has_next"; + + /// + /// Field name for array of events. + /// + public const string EventsFieldName = "events"; + #endregion #region "stats" block diff --git a/Fauna/IClient.cs b/Fauna/IClient.cs index df21d205..c5a7b67c 100644 --- a/Fauna/IClient.cs +++ b/Fauna/IClient.cs @@ -593,7 +593,7 @@ public async Task> EventStreamAsync( /// /// /// - /// + /// Which Type to map the Events to. /// public async Task> EventStreamAsync( EventSource eventSource, @@ -604,14 +604,72 @@ public async Task> EventStreamAsync( return new StreamEnumerable(this, eventSource, cancellationToken); } + + /// + /// Opens the event feed with Fauna and returns an enumerator for the events. + /// + /// The type of event data that will be deserialized from the stream. + /// The event source to subscribe to. + /// The mapping context to use for deserializing stream events. + /// The cancellation token for the operation. + /// An async enumerator of stream events. + /// Implementation + internal abstract IAsyncEnumerator> SubscribeFeedInternal( + EventSource eventSource, + MappingContext ctx, + CancellationToken cancel = default) where T : notnull; + + /// + /// Opens the event feed with Fauna and returns an enumerator for the events. + /// + /// + /// The options for the feed. + /// The cancellation token for the operation. + /// Which Type to map the Events to. + /// + public async Task> EventFeedAsync( + EventSource eventSource, + FeedOptions? feedOptions = null, + CancellationToken cancellationToken = default) where T : notnull + { + await Task.CompletedTask; + + return new FeedEnumerable(this, eventSource, feedOptions, cancellationToken); + } + + /// + /// Opens the event feed with Fauna and returns an enumerator for the events. + /// + /// The query to create the stream from Fauna. + /// The options for the query. + /// The options for the feed. + /// The cancellation token for the operation. + /// Which Type to map the Events to. + /// + public async Task> EventFeedAsync( + Query query, + QueryOptions? queryOptions = null, + FeedOptions? feedOptions = null, + CancellationToken cancellationToken = default) where T : notnull + { + if (feedOptions?.Cursor != null) + { + throw new InvalidOperationException("Cannot use Cursor when opening an EventFeed with a Query."); + } + + EventSource eventSource = await GetEventSourceFromQueryAsync(query, queryOptions, cancellationToken); + + return new FeedEnumerable(this, eventSource, feedOptions, cancellationToken); + } + /// /// Retrieves an EventSource from Fauna Query /// /// /// /// - /// - public async Task GetEventSourceFromQueryAsync( + /// EventSource returned from Query + private async Task GetEventSourceFromQueryAsync( Query query, QueryOptions? queryOptions, CancellationToken cancellationToken) @@ -640,5 +698,21 @@ public IAsyncEnumerator> SubscribeStream( return SubscribeStreamInternal(eventSource, ctx, cancel); } + /// + /// Opens an event feed with Fauna and returns an enumerator for the events. + /// + /// Event Data will be deserialized to this type. + /// The stream to subscribe to. + /// Mapping context for stream. + /// The cancellation token. + /// An async enumerator of stream events. + public IAsyncEnumerator> SubscribeFeed( + EventSource eventSource, + MappingContext ctx, + CancellationToken cancel = default) where T : notnull + { + return SubscribeFeedInternal(eventSource, ctx, cancel); + } + #endregion } diff --git a/Fauna/Linq/DataContext.cs b/Fauna/Linq/DataContext.cs index 864571cd..cd33443a 100644 --- a/Fauna/Linq/DataContext.cs +++ b/Fauna/Linq/DataContext.cs @@ -62,6 +62,16 @@ internal override IAsyncEnumerator> SubscribeStreamInternal( return _client.SubscribeStreamInternal(eventSource, ctx, cancel); } + internal override IAsyncEnumerator> SubscribeFeedInternal( + EventSource eventSource, + MappingContext ctx, + CancellationToken cancel = default) + { + CheckInitialization(); + return _client.SubscribeFeedInternal(eventSource, ctx, cancel); + } + + // Schema DSL /// diff --git a/Fauna/Types/Event.cs b/Fauna/Types/Event.cs index d8cf9209..4f35d1a8 100644 --- a/Fauna/Types/Event.cs +++ b/Fauna/Types/Event.cs @@ -58,17 +58,16 @@ public class Event where T : notnull /// public QueryStats Stats { get; private init; } + /// - /// A helper method for converting a JSON string into an event. + /// A helper method for converting a JSON element into an Event. /// - /// The string of raw JSON. + /// JSON Element to convert. /// A mapping context to influence deserialization. /// An instance of . /// Thrown when the event includes a Fauna error. - public static Event From(string body, MappingContext ctx) + public static Event From(JsonElement json, MappingContext ctx) { - var json = JsonSerializer.Deserialize(body); - var err = GetError(json); if (err != null) { @@ -87,6 +86,20 @@ public static Event From(string body, MappingContext ctx) return evt; } + /// + /// A helper method for converting a JSON string into an Event. + /// + /// The string of raw JSON. + /// A mapping context to influence deserialization. + /// An instance of . + /// Thrown when the event includes a Fauna error. + public static Event From(string body, MappingContext ctx) + { + var json = JsonSerializer.Deserialize(body); + + return From(json, ctx); + } + private static long GetTxnTime(JsonElement json) { if (!json.TryGetProperty(LastSeenTxnFieldName, out var elem)) diff --git a/Fauna/Types/EventSource.cs b/Fauna/Types/EventSource.cs index b7f1b0f1..f057808f 100644 --- a/Fauna/Types/EventSource.cs +++ b/Fauna/Types/EventSource.cs @@ -31,6 +31,11 @@ public EventSource(string token) /// public string? LastCursor { get; set; } + /// + /// Set the page size when using Event Feeds. + /// + public int? PageSize { get; set; } + /// /// Serializes the event source to the provided . /// @@ -48,6 +53,12 @@ public void Serialize(Stream stream) { writer.WriteNumber("start_ts", StartTs.Value); } + + if (PageSize is > 0) + { + writer.WriteNumber("page_size", PageSize.Value); + } + writer.WriteEndObject(); writer.Flush(); }