Skip to content

Commit

Permalink
event source options
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy committed Nov 7, 2024
1 parent f1b6436 commit 64ad86c
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 68 deletions.
3 changes: 2 additions & 1 deletion Fauna.Test/Integration.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ public void ValidateUnwrappedMapOfQueriesError()
[Category("serialization")]
public async Task ValidateBytesAcrossTheWire()
{
byte[] byteArray = { 70, 97, 117, 110, 97 };
// ReSharper disable once UseUtf8StringLiteral
byte[] byteArray = [70, 97, 117, 110, 97];
byte[]? nullArray = null;

var result = await _client.QueryAsync<List<object?>>(FQL($"let x:Bytes = {byteArray}; let y:Bytes|Null = {nullArray}; [x,y]"));
Expand Down
6 changes: 4 additions & 2 deletions Fauna/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ internal override async IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
cancel))
{
LastSeenTxn = evt.TxnTime;
eventSource.LastCursor = evt.Cursor;
eventSource.Options.Cursor = evt.Cursor;

StatsCollector?.Add(evt.Stats);
yield return evt;
}
Expand Down Expand Up @@ -190,7 +191,8 @@ internal override async IAsyncEnumerator<FeedPage<T>> SubscribeFeedInternal<T>(
string body = await httpResponse.Content.ReadAsStringAsync(cancel);

var res = FeedPage<T>.From(body, ctx);
eventSource.LastCursor = res.Cursor;
eventSource.Options.Cursor = res.Cursor;

StatsCollector?.Add(res.Stats);
yield return res;
if (!res.HasNext)
Expand Down
3 changes: 2 additions & 1 deletion Fauna/Core/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public async IAsyncEnumerable<Event<T>> OpenStream<T>(
}

var evt = Event<T>.From(line, ctx);
eventSource.LastCursor = evt.Cursor;
eventSource.Options.Cursor = evt.Cursor;

bc.Add(evt, cancellationToken);
}

Expand Down
11 changes: 2 additions & 9 deletions Fauna/Core/FeedEnumberable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ public class FeedEnumerable<T> where T : notnull
private readonly BaseClient _client;
private readonly EventSource _eventSource;
private readonly CancellationToken _cancel;
private readonly FeedOptions? _feedOptions;

/// <summary>
/// The current cursor for the Feed.
/// Current cursor for the Feed.
/// </summary>
public string? Cursor => _eventSource.LastCursor;
public string? Cursor => CurrentPage?.Cursor;

/// <summary>
/// The latest page returned from the Event Feed enumerator.
Expand All @@ -27,17 +26,11 @@ public class FeedEnumerable<T> where T : notnull
internal FeedEnumerable(
BaseClient client,
EventSource eventSource,
FeedOptions? feedOptions = null,
CancellationToken cancel = default)
{
_client = client;
_eventSource = eventSource;
_cancel = cancel;
_feedOptions = feedOptions;

_eventSource.LastCursor = feedOptions?.Cursor;
_eventSource.StartTs = feedOptions?.StartTs;
_eventSource.PageSize = feedOptions?.PageSize;
}

/// <summary>
Expand Down
20 changes: 3 additions & 17 deletions Fauna/Core/FeedOptions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Fauna.Types;

namespace Fauna.Core;

/// <summary>
/// Represents the options when subscribing to Fauna Event Feeds.
/// </summary>
public class FeedOptions
public class FeedOptions : EventOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="FeedOptions"/> class with the specified cursor and optional page size.
Expand Down Expand Up @@ -37,20 +39,4 @@ public FeedOptions(int pageSize)
{
PageSize = pageSize;
}

/// <summary>
/// Cursor returned from Fauna
/// </summary>
/// <seealso href="https://docs.fauna.com/fauna/current/reference/cdc/#get-events-after-a-specific-cursor"/>
public string? Cursor { get; }

/// <summary>
/// Start timestamp returned for the feed. Used to resume the Feed.
/// </summary>
public long? StartTs { get; }

/// <summary>
/// Limit page size for the Feed
/// </summary>
public int? PageSize { get; }
}
12 changes: 3 additions & 9 deletions Fauna/Core/StreamOptions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Fauna.Types;

namespace Fauna;

/// <summary>
/// Represents the options when subscribing to Fauna Event Streams.
/// </summary>
public class StreamOptions
public class StreamOptions : EventOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="StreamOptions"/> class with the specified token and cursor.
Expand Down Expand Up @@ -32,12 +34,4 @@ public StreamOptions(string token, long startTs)
/// See the <a
/// href="https://docs.fauna.com/fauna/current/reference/cdc/#event-source">Create an event source</a>.
public string? Token { get; }

/// <summary>Cursor from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// See <a href="https://docs.fauna.com/fauna/current/reference/cdc/#restart-cursor">Restart from an event cursor</a>.
public string? Cursor { get; }

/// <summary>Start timestamp from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// See <a href="https://docs.fauna.com/fauna/current/reference/cdc/#restart-txn-ts">Restart from a transaction timestamp</a>.
public long? StartTs { get; }
}
9 changes: 6 additions & 3 deletions Fauna/IClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public async Task<StreamEnumerable<T>> EventStreamAsync<T>(
CancellationToken cancellationToken = default) where T : notnull
{
EventSource eventSource = streamOptions?.Token != null
? new EventSource(streamOptions.Token) { LastCursor = streamOptions.Cursor, StartTs = streamOptions.StartTs }
? new EventSource(streamOptions.Token) { Options = streamOptions }
: await GetEventSourceFromQueryAsync(query, queryOptions, cancellationToken);

return new StreamEnumerable<T>(this, eventSource, cancellationToken);
Expand Down Expand Up @@ -634,7 +634,9 @@ public async Task<FeedEnumerable<T>> EventFeedAsync<T>(
{
await Task.CompletedTask;

return new FeedEnumerable<T>(this, eventSource, feedOptions, cancellationToken);
if (feedOptions != null) eventSource.Options = feedOptions;

return new FeedEnumerable<T>(this, eventSource, cancellationToken);
}

/// <summary>
Expand All @@ -656,8 +658,9 @@ public async Task<FeedEnumerable<T>> EventFeedAsync<T>(
}

EventSource eventSource = await GetEventSourceFromQueryAsync(query, null, cancellationToken);
if (feedOptions != null) eventSource.Options = feedOptions;

return new FeedEnumerable<T>(this, eventSource, feedOptions, cancellationToken);
return new FeedEnumerable<T>(this, eventSource, cancellationToken);
}

/// <summary>
Expand Down
62 changes: 36 additions & 26 deletions Fauna/Types/EventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,22 @@ namespace Fauna.Types;
/// </summary>
public sealed class EventSource : IEquatable<EventSource>
{
/// <summary>
/// Initializes an <see cref="EventSource"/>.
/// </summary>
/// <param name="token">An event source.</param>
public EventSource(string token)
{
Token = token;
}

/// <summary>
/// Gets the string value of the stream token.
/// </summary>
internal string Token { get; }

/// <summary>
/// The start timestamp of the Event Feed or Event Stream.
/// </summary>
public long? StartTs { get; set; }

/// <summary>
/// The starting cursor for the Event Feed or Event Stream. Typically, this is the last observed cursor.
/// </summary>
public string? LastCursor { get; set; }
internal EventOptions Options { get; set; }

/// <summary>
/// Set the page size when using Event Feeds.
/// Initializes an <see cref="EventSource"/>.
/// </summary>
public int? PageSize { get; set; }
/// <param name="token">An event source.</param>
public EventSource(string token)
{
Token = token;
Options = new EventOptions();
}

/// <summary>
/// Serializes the event source to the provided <see cref="Stream"/>.
Expand All @@ -45,18 +33,18 @@ public void Serialize(Stream stream)
var writer = new Utf8JsonWriter(stream);
writer.WriteStartObject();
writer.WriteString("token", Token);
if (LastCursor != null)
if (Options.Cursor != null)
{
writer.WriteString("cursor", LastCursor);
writer.WriteString("cursor", Options.Cursor);
}
else if (StartTs != null)
else if (Options.StartTs != null)
{
writer.WriteNumber("start_ts", StartTs.Value);
writer.WriteNumber("start_ts", Options.StartTs.Value);
}

if (PageSize is > 0)
if (Options.PageSize is > 0)
{
writer.WriteNumber("page_size", PageSize.Value);
writer.WriteNumber("page_size", Options.PageSize.Value);
}

writer.WriteEndObject();
Expand Down Expand Up @@ -98,3 +86,25 @@ public override int GetHashCode()
return Token.GetHashCode();
}
}

/// <summary>
/// Represents the options for a Fauna EventSource.
/// </summary>
public class EventOptions
{
/// <summary>
/// Cursor returned from Fauna
/// </summary>
/// <seealso href="https://docs.fauna.com/fauna/current/reference/cdc/#get-events-after-a-specific-cursor"/>
public string? Cursor { get; internal set; }

/// <summary>
/// Start timestamp returned for the feed. Used to resume the Feed.
/// </summary>
public long? StartTs { get; protected init; }

/// <summary>
/// Limit page size for the Feed
/// </summary>
public int? PageSize { get; protected init; }
}

0 comments on commit 64ad86c

Please sign in to comment.