Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add topics #468

Merged
merged 27 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e10ae02
feat: add topics
nand4011 Sep 1, 2023
4f4cb06
split message types
nand4011 Sep 1, 2023
5e1f623
Return false from the iterator when the stream is cancelled.
nand4011 Sep 1, 2023
f4623d1
Add print statements to try and identify hanging test.
nand4011 Sep 1, 2023
b728deb
Only enable topics for .NET Standard 2.0 or greater
nand4011 Sep 1, 2023
117257e
Comment out tests for troubleshooting and replace sleep with delay
nand4011 Sep 1, 2023
fc66489
Manually synchronize the test and add more print statements
nand4011 Sep 1, 2023
be310b9
Temporarily remove subscribing from test.
nand4011 Sep 2, 2023
541b0e2
Make the test as simple as possible.
nand4011 Sep 2, 2023
68b1cf3
Modify the test to only create a task that waits.
nand4011 Sep 2, 2023
34180b8
Modify the test to publish messages.
nand4011 Sep 2, 2023
8382d43
Remake the test to be explicitly threaded
nand4011 Sep 2, 2023
9ad31e5
Remake publish/subscribe tests to be explicitly threaded.
nand4011 Sep 2, 2023
264d46f
Make message production thread in tests.
nand4011 Sep 2, 2023
d7f8738
re-add the single test filter
nand4011 Sep 2, 2023
eedc1ad
Troubleshoot test threading.
nand4011 Sep 5, 2023
f710074
Remove test filter from build.yaml
nand4011 Sep 5, 2023
8252472
add and use a separate topic configuration.
nand4011 Sep 5, 2023
020c364
detailed logging
kvcache Sep 5, 2023
a8044e5
switch to normal verbosity, & cancel even if something goes wrong
kvcache Sep 5, 2023
f0e603c
xunit uses a different way to set up verbosity
kvcache Sep 5, 2023
b56bfb4
naming, threading, test case coverage
kvcache Sep 5, 2023
9182598
Temporarily disable one of the topic tests.
nand4011 Sep 5, 2023
0180c5b
leave continuation associated with original async context
kvcache Sep 5, 2023
9e49d9f
disable test parallelism
nand4011 Sep 5, 2023
0b81e15
Add ValueString and ValueByteArray methods to TopicMessage.
nand4011 Sep 5, 2023
052d9cd
Multiple pr fixes
nand4011 Sep 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Threading.Tasks;
using Momento.Sdk.Responses;

namespace Momento.Sdk;

/// <summary>
/// Minimum viable functionality of a topic client.
/// </summary>
public interface ITopicClient : IDisposable
{
/// <summary>
/// Publish a value to a topic in a cache.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="value">The value to be published.</param>
/// <returns>
/// Task object representing the result of the publish operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicPublishResponse.Success</description></item>
/// <item><description>TopicPublishResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicPublishResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, byte[] value);

/// <inheritdoc cref="PublishAsync(string, string, byte[])"/>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, string value);

/// <summary>
/// Subscribe to a topic. The returned value can be used to iterate over newly published messages on the topic.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// If provided, the client will attempt to start the stream from that sequence number.</param>
/// <returns>
/// Task object representing the result of the subscribe operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicSubscribeResponse.Subscription</description></item>
/// <item><description>TopicSubscribeResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicSubscribeResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
}
69 changes: 69 additions & 0 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,75 @@ public static TSuccess LogTraceCollectionRequestSuccess<TSuccess>(this ILogger _
}
return success;
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic request is about to be executed.
/// </summary>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
public static void LogTraceExecutingTopicRequest(this ILogger logger, string requestType, string cacheName, string topicName)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Executing '{}' request: cacheName: {}; topicName: {}", requestType, cacheName, topicName);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic request resulted in an error.
/// </summary>
/// <typeparam name="TError"></typeparam>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="error"></param>
/// <returns></returns>
public static TError LogTraceTopicRequestError<TError>(this ILogger logger, string requestType, string cacheName, string topicName, TError error)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("An error occurred while executing a '{}' request: cacheName: {}; topicName: {}; error: {}", requestType, cacheName, topicName, error);
}
return error;
}

/// <summary>
/// /// Logs a message at TRACE level that indicates that a topic request resulted in a success.
/// </summary>
/// <typeparam name="TSuccess"></typeparam>
/// <param name="logger"></param>
/// <param name="requestType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="success"></param>
/// <returns></returns>
public static TSuccess LogTraceTopicRequestSuccess<TSuccess>(this ILogger logger, string requestType, string cacheName, string topicName, TSuccess success)
{

if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Successfully executed '{}' request: cacheName: {}; topicName: {}; success: {}", requestType, cacheName, topicName, success);
}
return success;
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic message was received.
/// </summary>
/// <param name="logger"></param>
/// <param name="messageType"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
public static void LogTraceTopicMessageReceived(this ILogger logger, string messageType, string cacheName, string topicName)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}

private static string ReadableByteString(ByteString? input)
{
Expand Down
183 changes: 183 additions & 0 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient.Pubsub;
using Momento.Sdk.Config;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;
using Momento.Sdk.Responses;

namespace Momento.Sdk.Internal;

public class ScsTopicClientBase : IDisposable

Check warning on line 14 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase'

Check warning on line 14 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase'
{
protected readonly TopicGrpcManager grpcManager;

Check warning on line 16 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.grpcManager'

Check warning on line 16 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.grpcManager'
private readonly TimeSpan dataClientOperationTimeout;
private readonly ILogger _logger;

protected readonly CacheExceptionMapper _exceptionMapper;

Check warning on line 20 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase._exceptionMapper'

Check warning on line 20 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase._exceptionMapper'

public ScsTopicClientBase(IConfiguration config, string authToken, string endpoint)

Check warning on line 22 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.ScsTopicClientBase(IConfiguration, string, string)'

Check warning on line 22 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.ScsTopicClientBase(IConfiguration, string, string)'
{
this.grpcManager = new TopicGrpcManager(config, authToken, endpoint);
this.dataClientOperationTimeout = config.TransportStrategy.GrpcConfig.Deadline;
this._logger = config.LoggerFactory.CreateLogger<ScsDataClient>();
this._exceptionMapper = new CacheExceptionMapper(config.LoggerFactory);
}

protected Metadata MetadataWithCache(string cacheName)

Check warning on line 30 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.MetadataWithCache(string)'

Check warning on line 30 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.MetadataWithCache(string)'
{
return new Metadata() { { "cache", cacheName } };
}

protected DateTime CalculateDeadline()

Check warning on line 35 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.CalculateDeadline()'

Check warning on line 35 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.CalculateDeadline()'
{
return DateTime.UtcNow.Add(dataClientOperationTimeout);
}

public void Dispose()

Check warning on line 40 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.Dispose()'

Check warning on line 40 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

Missing XML comment for publicly visible type or member 'ScsTopicClientBase.Dispose()'
{
this.grpcManager.Dispose();
}
}

internal sealed class ScsTopicClient : ScsTopicClientBase
{
private readonly ILogger _logger;

public ScsTopicClient(IConfiguration config, string authToken, string endpoint)
: base(config, authToken, endpoint)
{
this._logger = config.LoggerFactory.CreateLogger<ScsTopicClient>();
}

public async Task<TopicPublishResponse> Publish(string cacheName, string topicName, byte[] value)
{
var topicValue = new _TopicValue
{
Binary = value.ToByteString()
};
return await SendPublish(cacheName, topicName, topicValue);
}

public async Task<TopicPublishResponse> Publish(string cacheName, string topicName, string value)
{
var topicValue = new _TopicValue
{
Text = value
};
return await SendPublish(cacheName, topicName, topicValue);
}

public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber = null)
{
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber);
}

private const string RequestTypeTopicPublish = "TOPIC_PUBLISH";

private async Task<TopicPublishResponse> SendPublish(string cacheName, string topicName, _TopicValue value)
{
_PublishRequest request = new _PublishRequest
{
CacheName = cacheName,
Topic = topicName,
Value = value
};

try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName);
await grpcManager.Client.publish(request, new CallOptions(deadline: CalculateDeadline()));
}
catch (Exception e)
{
return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName,
new TopicPublishResponse.Error(_exceptionMapper.Convert(e)));
}

return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName,
new TopicPublishResponse.Success());
}

private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,

Check warning on line 106 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, net6.0)

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 106 in src/Momento.Sdk/Internal/ScsTopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, net461)

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
ulong? resumeAtTopicSequenceNumber)
{
var request = new _SubscriptionRequest
{
CacheName = cacheName,
Topic = topicName
};
if (resumeAtTopicSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

AsyncServerStreamingCall<_SubscriptionItem> subscription;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName);
subscription = grpcManager.Client.subscribe(request, new CallOptions());
}
catch (Exception e)
{
return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName,
new TopicSubscribeResponse.Error(_exceptionMapper.Convert(e)));
}

var response = new TopicSubscribeResponse.Subscription(
token => MoveNextAsync(subscription, token, cacheName, topicName),
nand4011 marked this conversation as resolved.
Show resolved Hide resolved
() => subscription.Dispose());
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName,
response);
}

private async ValueTask<TopicMessage?> MoveNextAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
CancellationToken cancellationToken, string cacheName, string topicName)
{
if (cancellationToken.IsCancellationRequested)
{
return null;
}

try
{
while (await subscription.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false))
{
var message = subscription.ResponseStream.Current;

switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
_logger.LogTraceTopicMessageReceived("item", cacheName, topicName);
return new TopicMessage.Item(message.Item);
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
break;
Comment on lines +173 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be returning a subscription item so users can be aware of missing messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to add Discontinuity as a separate item in a second pass, but I can add it now if if won't add too much complexity to the pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need to get this out the door, we will add those later

case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
break;
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
break;
}
}
}
catch (OperationCanceledException)
{
return null;
}
catch (Exception e)
{
return new TopicMessage.Error(_exceptionMapper.Convert(e));
}

return null;
}
}
Loading
Loading