Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use ulong and topics and plumb through resumeAt subscription arguments #589

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 15 additions & 32 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,12 @@ private async Task<TopicPublishResponse> SendPublish(string cacheName, string to
private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage)
{
var request = new _SubscriptionRequest
{
CacheName = cacheName,
Topic = topicName
};
if (resumeAtTopicSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}
if (resumeAtTopicSequencePage != null)
{
request.SequencePage = resumeAtTopicSequencePage.Value;
}

SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName,
resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger);
await subscriptionWrapper.Subscribe();
}
catch (Exception e)
Expand All @@ -161,16 +148,19 @@ private class SubscriptionWrapper : IDisposable
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private ulong? _lastSequencePage;
private ulong _lastSequenceNumber;
private ulong _lastSequencePage;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage,
CacheExceptionMapper exceptionMapper, ILogger logger)
{
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0;
_lastSequencePage = resumeAtTopicSequencePage ?? 0;
_exceptionMapper = exceptionMapper;
_logger = logger;
}
Expand All @@ -183,15 +173,8 @@ public async Task Subscribe()
Topic = _topicName
};

if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

if (_lastSequencePage != null)
{
request.SequencePage = _lastSequencePage.Value;
}
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber;
request.SequencePage = _lastSequencePage;

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());
Expand Down Expand Up @@ -264,10 +247,10 @@ public async Task Subscribe()
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -280,9 +263,9 @@ public async Task Subscribe()
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
_lastSequencePage = message.Discontinuity.NewSequencePage;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence),
checked((long)message.Discontinuity.NewSequencePage));
return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence,
message.Discontinuity.NewTopicSequence,
message.Discontinuity.NewSequencePage);
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
return new TopicSystemEvent.Heartbeat();
Expand Down
12 changes: 6 additions & 6 deletions src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Text : TopicMessage
/// <summary>
/// A topic message containing a text value.
/// </summary>
public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Text;
TopicSequenceNumber = topicSequenceNumber;
Expand All @@ -58,12 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequence
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand All @@ -86,7 +86,7 @@ public class Binary : TopicMessage
/// <summary>
/// A topic message containing a binary value.
/// </summary>
public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Binary.ToByteArray();
TopicSequenceNumber = topicSequenceNumber;
Expand All @@ -103,12 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequen
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand Down
8 changes: 4 additions & 4 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Discontinuity : TopicSystemEvent
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
/// <param name="sequencePage">The sequence page of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage)
public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
Expand All @@ -43,17 +43,17 @@ public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long seq
/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
public ulong LastKnownSequenceNumber { get; }

/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }
public ulong SequenceNumber { get; }

/// <summary>
/// The sequence page of the discontinuity.
/// </summary>
public long SequencePage { get; }
public ulong SequencePage { get; }

/// <inheritdoc/>
public override string ToString()
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds()
{
var textMessage = (TopicMessage.Text)consumedMessages[i];
Assert.Equal(textMessage.Value, valuesToSend[i]);
Assert.Equal(textMessage.TopicSequenceNumber, i + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1)));
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds()
{
case TopicMessage.Text textMessage:
Assert.Equal(textMessage.Value, valuesToSend[messageCount]);
Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1)));
messageCount++;
break;
case TopicSystemEvent.Heartbeat:
Expand Down
Loading