From e10ae028e934a4d1230993a8d6d3ffae6957fa14 Mon Sep 17 00:00:00 2001 From: Nate Anderson Date: Thu, 31 Aug 2023 22:32:14 -0700 Subject: [PATCH] feat: add topics Add the initial topics implementation. The entry point is TopicClient, which contains the publish and subscribe methods. Subscribe returns an IAsyncEnumerable that can be iterated over to read from the topic. --- src/Momento.Sdk/ITopicClient.cs | 65 +++++ src/Momento.Sdk/Internal/LoggingUtils.cs | 69 +++++ src/Momento.Sdk/Internal/ScsTopicClient.cs | 183 ++++++++++++++ src/Momento.Sdk/Internal/TopicGrpcManager.cs | 152 +++++++++++ src/Momento.Sdk/Momento.Sdk.csproj | 3 +- src/Momento.Sdk/Responses/TopicMessage.cs | 93 +++++++ .../Responses/TopicPublishResponse.cs | 62 +++++ .../Responses/TopicSubscribeResponse.cs | 149 +++++++++++ src/Momento.Sdk/TopicClient.cs | 81 ++++++ .../Integration/Momento.Sdk.Tests/Fixtures.cs | 28 +++ .../Momento.Sdk.Tests/TopicTest.cs | 236 ++++++++++++++++++ 11 files changed, 1120 insertions(+), 1 deletion(-) create mode 100644 src/Momento.Sdk/ITopicClient.cs create mode 100644 src/Momento.Sdk/Internal/ScsTopicClient.cs create mode 100644 src/Momento.Sdk/Internal/TopicGrpcManager.cs create mode 100644 src/Momento.Sdk/Responses/TopicMessage.cs create mode 100644 src/Momento.Sdk/Responses/TopicPublishResponse.cs create mode 100644 src/Momento.Sdk/Responses/TopicSubscribeResponse.cs create mode 100644 src/Momento.Sdk/TopicClient.cs create mode 100644 tests/Integration/Momento.Sdk.Tests/TopicTest.cs diff --git a/src/Momento.Sdk/ITopicClient.cs b/src/Momento.Sdk/ITopicClient.cs new file mode 100644 index 00000000..f3793c02 --- /dev/null +++ b/src/Momento.Sdk/ITopicClient.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading.Tasks; +using Momento.Sdk.Responses; + +namespace Momento.Sdk; + +/// +/// Minimum viable functionality of a topic client. +/// +public interface ITopicClient : IDisposable +{ + /// + /// Publish a value to a topic in a cache. + /// + /// Name of the cache containing the topic. + /// Name of the topic. + /// The value to be published. + /// + /// Task object representing the result of the publish operation. The + /// response object is resolved to a type-safe object of one of + /// the following subtypes: + /// + /// TopicPublishResponse.Success + /// TopicPublishResponse.Error + /// + /// Pattern matching can be used to operate on the appropriate subtype. + /// For example: + /// + /// if (response is TopicPublishResponse.Error errorResponse) + /// { + /// // handle error as appropriate + /// } + /// + /// + public Task PublishAsync(string cacheName, string topicName, byte[] value); + + /// + public Task PublishAsync(string cacheName, string topicName, string value); + + /// + /// Subscribe to a topic. The returned value can be used to iterate over newly published messages on the topic. + /// + /// Name of the cache containing the topic. + /// Name of the topic. + /// The sequence number of the last message. + /// If provided, the client will attempt to start the stream from that sequence number. + /// + /// Task object representing the result of the subscribe operation. The + /// response object is resolved to a type-safe object of one of + /// the following subtypes: + /// + /// TopicSubscribeResponse.Subscription + /// TopicSubscribeResponse.Error + /// + /// Pattern matching can be used to operate on the appropriate subtype. + /// For example: + /// + /// if (response is TopicSubscribeResponse.Error errorResponse) + /// { + /// // handle error as appropriate + /// } + /// + /// + public Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null); +} \ No newline at end of file diff --git a/src/Momento.Sdk/Internal/LoggingUtils.cs b/src/Momento.Sdk/Internal/LoggingUtils.cs index 58f11e53..2eba477f 100644 --- a/src/Momento.Sdk/Internal/LoggingUtils.cs +++ b/src/Momento.Sdk/Internal/LoggingUtils.cs @@ -379,6 +379,75 @@ public static TSuccess LogTraceCollectionRequestSuccess(this ILogger _ } return success; } + + /// + /// Logs a message at TRACE level that indicates that a topic request is about to be executed. + /// + /// + /// + /// + /// + public static void LogTraceExecutingTopicRequest(this ILogger logger, string requestType, string cacheName, string topicName) + { + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("Executing '{}' request: cacheName: {}; topicName: {}", requestType, cacheName, topicName); + } + } + + /// + /// Logs a message at TRACE level that indicates that a topic request resulted in an error. + /// + /// + /// + /// + /// + /// + /// + /// + public static TError LogTraceTopicRequestError(this ILogger logger, string requestType, string cacheName, string topicName, TError error) + { + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("An error occurred while executing a '{}' request: cacheName: {}; topicName: {}; error: {}", requestType, cacheName, topicName, error); + } + return error; + } + + /// + /// /// Logs a message at TRACE level that indicates that a topic request resulted in a success. + /// + /// + /// + /// + /// + /// + /// + /// + public static TSuccess LogTraceTopicRequestSuccess(this ILogger logger, string requestType, string cacheName, string topicName, TSuccess success) + { + + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("Successfully executed '{}' request: cacheName: {}; topicName: {}; success: {}", requestType, cacheName, topicName, success); + } + return success; + } + + /// + /// Logs a message at TRACE level that indicates that a topic message was received. + /// + /// + /// + /// + /// + public static void LogTraceTopicMessageReceived(this ILogger logger, string messageType, string cacheName, string topicName) + { + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName); + } + } private static string ReadableByteString(ByteString? input) { diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs new file mode 100644 index 00000000..0deaf528 --- /dev/null +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -0,0 +1,183 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Momento.Protos.CacheClient.Pubsub; +using Momento.Sdk.Config; +using Momento.Sdk.Exceptions; +using Momento.Sdk.Internal.ExtensionMethods; +using Momento.Sdk.Responses; + +namespace Momento.Sdk.Internal; + +public class ScsTopicClientBase : IDisposable +{ + protected readonly TopicGrpcManager grpcManager; + private readonly TimeSpan dataClientOperationTimeout; + private readonly ILogger _logger; + + protected readonly CacheExceptionMapper _exceptionMapper; + + public ScsTopicClientBase(IConfiguration config, string authToken, string endpoint) + { + this.grpcManager = new TopicGrpcManager(config, authToken, endpoint); + this.dataClientOperationTimeout = config.TransportStrategy.GrpcConfig.Deadline; + this._logger = config.LoggerFactory.CreateLogger(); + this._exceptionMapper = new CacheExceptionMapper(config.LoggerFactory); + } + + protected Metadata MetadataWithCache(string cacheName) + { + return new Metadata() { { "cache", cacheName } }; + } + + protected DateTime CalculateDeadline() + { + return DateTime.UtcNow.Add(dataClientOperationTimeout); + } + + public void Dispose() + { + this.grpcManager.Dispose(); + } +} + +internal sealed class ScsTopicClient : ScsTopicClientBase +{ + private readonly ILogger _logger; + + public ScsTopicClient(IConfiguration config, string authToken, string endpoint) + : base(config, authToken, endpoint) + { + this._logger = config.LoggerFactory.CreateLogger(); + } + + public async Task Publish(string cacheName, string topicName, byte[] value) + { + var topicValue = new _TopicValue + { + Binary = value.ToByteString() + }; + return await SendPublish(cacheName, topicName, topicValue); + } + + public async Task Publish(string cacheName, string topicName, string value) + { + var topicValue = new _TopicValue + { + Text = value + }; + return await SendPublish(cacheName, topicName, topicValue); + } + + public async Task Subscribe(string cacheName, string topicName, + ulong? resumeAtTopicSequenceNumber = null) + { + return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber); + } + + private const string RequestTypeTopicPublish = "TOPIC_PUBLISH"; + + private async Task SendPublish(string cacheName, string topicName, _TopicValue value) + { + _PublishRequest request = new _PublishRequest + { + CacheName = cacheName, + Topic = topicName, + Value = value + }; + + try + { + _logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName); + await grpcManager.Client.publish(request, new CallOptions(deadline: CalculateDeadline())); + } + catch (Exception e) + { + return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName, + new TopicPublishResponse.Error(_exceptionMapper.Convert(e))); + } + + return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName, + new TopicPublishResponse.Success()); + } + + private async Task SendSubscribe(string cacheName, string topicName, + ulong? resumeAtTopicSequenceNumber) + { + var request = new _SubscriptionRequest + { + CacheName = cacheName, + Topic = topicName + }; + if (resumeAtTopicSequenceNumber != null) + { + request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value; + } + + AsyncServerStreamingCall<_SubscriptionItem> subscription; + try + { + _logger.LogTraceExecutingTopicRequest(RequestTypeTopicPublish, cacheName, topicName); + subscription = grpcManager.Client.subscribe(request, new CallOptions()); + } + catch (Exception e) + { + return _logger.LogTraceTopicRequestError(RequestTypeTopicPublish, cacheName, topicName, + new TopicSubscribeResponse.Error(_exceptionMapper.Convert(e))); + } + + var response = new TopicSubscribeResponse.Subscription( + token => MoveNextAsync(subscription, token, cacheName, topicName), + () => subscription.Dispose()); + return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicPublish, cacheName, topicName, + response); + } + + private async ValueTask MoveNextAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription, + CancellationToken cancellationToken, string cacheName, string topicName) + { + if (cancellationToken.IsCancellationRequested) + { + return null; + } + + try + { + while (await subscription.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) + { + var message = subscription.ResponseStream.Current; + + switch (message.KindCase) + { + case _SubscriptionItem.KindOneofCase.Item: + _logger.LogTraceTopicMessageReceived("item", cacheName, topicName); + return new TopicMessage.Item(message.Item); + case _SubscriptionItem.KindOneofCase.Discontinuity: + _logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName); + break; + case _SubscriptionItem.KindOneofCase.Heartbeat: + _logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName); + break; + case _SubscriptionItem.KindOneofCase.None: + _logger.LogTraceTopicMessageReceived("none", cacheName, topicName); + break; + default: + _logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName); + break; + } + } + } + catch (OperationCanceledException) + { + return null; + } + catch (Exception e) + { + return new TopicMessage.Error(_exceptionMapper.Convert(e)); + } + + return null; + } +} \ No newline at end of file diff --git a/src/Momento.Sdk/Internal/TopicGrpcManager.cs b/src/Momento.Sdk/Internal/TopicGrpcManager.cs new file mode 100644 index 00000000..527f4f12 --- /dev/null +++ b/src/Momento.Sdk/Internal/TopicGrpcManager.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Net.Client; +#if USE_GRPC_WEB +using System.Net.Http; +using Grpc.Net.Client.Web; +#endif +using Microsoft.Extensions.Logging; +using Momento.Protos.CacheClient; +using Momento.Protos.CacheClient.Pubsub; +using Momento.Protos.CachePing; +using Momento.Sdk.Config; +using Momento.Sdk.Config.Middleware; +using Momento.Sdk.Config.Retry; +using Momento.Sdk.Internal.Middleware; +using static System.Reflection.Assembly; + +namespace Momento.Sdk.Internal; + +public interface IPubsubClient +{ + public Task<_Empty> publish(_PublishRequest request, CallOptions callOptions); + public AsyncServerStreamingCall<_SubscriptionItem> subscribe(_SubscriptionRequest request, CallOptions callOptions); +} + +public class PubsubClientWithMiddleware : IPubsubClient +{ + private readonly Pubsub.PubsubClient _generatedClient; + private readonly IList _middlewares; + private readonly IList> _headers; + + public PubsubClientWithMiddleware(Pubsub.PubsubClient generatedClient, IList middlewares, + IList> headers) + { + _generatedClient = generatedClient; + _middlewares = middlewares; + _headers = headers; + } + + public async Task<_Empty> publish(_PublishRequest request, CallOptions callOptions) + { + var wrapped = await _middlewares.WrapRequest(request, callOptions, (r, o) => _generatedClient.PublishAsync(r, o)); + return await wrapped.ResponseAsync; + } + + public AsyncServerStreamingCall<_SubscriptionItem> subscribe(_SubscriptionRequest request, CallOptions callOptions) + { + // Middleware is not currently compatible with gRPC streaming calls, + // so we manually add the headers to ensure the call has the auth token. + var callHeaders = callOptions.Headers ?? new Metadata(); + if (callOptions.Headers == null) + { + callOptions = callOptions.WithHeaders(new Metadata()); + } + + foreach (var header in _headers) + { + callHeaders.Add(header.Item1, header.Item2); + } + + return _generatedClient.Subscribe(request, callOptions.WithHeaders(callHeaders)); + } +} + +public class TopicGrpcManager : IDisposable +{ + private readonly GrpcChannel channel; + + public readonly IPubsubClient Client; + +#if USE_GRPC_WEB + private static readonly string Moniker = "dotnet-web"; +#else + private static readonly string Moniker = "dotnet"; +#endif + private readonly string version = + $"{Moniker}:{GetAssembly(typeof(Responses.CacheGetResponse)).GetName().Version.ToString()}"; + + // Some System.Environment.Version remarks to be aware of + // https://learn.microsoft.com/en-us/dotnet/api/system.environment.version?view=netstandard-2.0#remarks + private readonly string runtimeVersion = $"{Moniker}:{Environment.Version}"; + private readonly ILogger _logger; + + internal TopicGrpcManager(IConfiguration config, string authToken, string endpoint) + { +#if USE_GRPC_WEB + // Note: all web SDK requests are routed to a `web.` subdomain to allow us flexibility on the server + endpoint = $"web.{endpoint}"; +#endif + var uri = $"https://{endpoint}"; + var channelOptions = config.TransportStrategy.GrpcConfig.GrpcChannelOptions; + if (channelOptions.LoggerFactory == null) + { + channelOptions.LoggerFactory = config.LoggerFactory; + } + + channelOptions.Credentials = ChannelCredentials.SecureSsl; +#if USE_GRPC_WEB + channelOptions.HttpHandler = new GrpcWebHandler(new HttpClientHandler()); +#endif + + channel = GrpcChannel.ForAddress(uri, channelOptions); + var headerTuples = new List> + { + new(Header.AuthorizationKey, authToken), new(Header.AgentKey, version), + new(Header.RuntimeVersionKey, runtimeVersion) + }; + var headers = headerTuples.Select(tuple => new Header(name: tuple.Item1, value: tuple.Item2)).ToList(); + + _logger = config.LoggerFactory.CreateLogger(); + + var invoker = channel.CreateCallInvoker(); + + var middlewares = config.Middlewares.Concat( + new List { + new RetryMiddleware(config.LoggerFactory, config.RetryStrategy), + new HeaderMiddleware(config.LoggerFactory, headers), + new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests) + } + ).ToList(); + + var client = new Pubsub.PubsubClient(invoker); + + if (config.TransportStrategy.EagerConnectionTimeout != null) + { + var eagerConnectionTimeout = config.TransportStrategy.EagerConnectionTimeout.Value; + _logger.LogDebug("TransportStrategy EagerConnection is enabled; attempting to connect to server"); + var pingClient = new Ping.PingClient(channel); + try + { + pingClient.Ping(new _PingRequest(), + new CallOptions(deadline: DateTime.UtcNow.Add(eagerConnectionTimeout))); + } + catch (RpcException) + { + _logger.LogWarning( + "Failed to eagerly connect to the server; continuing with execution in case failure is recoverable later."); + } + } + + Client = new PubsubClientWithMiddleware(client, middlewares, headerTuples); + } + + public void Dispose() + { + channel.Dispose(); + GC.SuppressFinalize(this); + } +} \ No newline at end of file diff --git a/src/Momento.Sdk/Momento.Sdk.csproj b/src/Momento.Sdk/Momento.Sdk.csproj index 28e4ca1b..73310590 100644 --- a/src/Momento.Sdk/Momento.Sdk.csproj +++ b/src/Momento.Sdk/Momento.Sdk.csproj @@ -54,7 +54,8 @@ - + + diff --git a/src/Momento.Sdk/Responses/TopicMessage.cs b/src/Momento.Sdk/Responses/TopicMessage.cs new file mode 100644 index 00000000..6d4f43bc --- /dev/null +++ b/src/Momento.Sdk/Responses/TopicMessage.cs @@ -0,0 +1,93 @@ +using Momento.Protos.CacheClient.Pubsub; +using Momento.Sdk.Exceptions; + +namespace Momento.Sdk.Responses; + +/// +/// Parent type for a topic message. The message is resolved to a type-safe +/// object of one of the following subtypes: +/// +/// TopicMessage.Item +/// TopicMessage.Error +/// +/// Pattern matching can be used to operate on the appropriate subtype. +/// For example: +/// +/// if (message is TopicMessage.Item item) +/// { +/// return item.ValueString(); +/// } +/// else if (message is TopicMessage.Error error) +/// { +/// // handle error as appropriate +/// } +/// else +/// { +/// // handle unexpected response +/// } +/// +/// +public abstract class TopicMessage +{ + /// + /// A topic message containing a value. If the value is a string, ValueString + /// will return it. If the value is binary, ValueByteArray will return it. + /// + public class Item : TopicMessage + { + private readonly _TopicValue _value; + + /// + /// Constructs an Item from an internal _TopicItem + /// + /// Containing the binary or string value. + public Item(_TopicItem topicItem) + { + _value = topicItem.Value; + TopicSequenceNumber = topicItem.TopicSequenceNumber; + } + + /// + /// The number of this message in the topic sequence. + /// + public ulong TopicSequenceNumber { get; } + + /// + /// The binary value of this message, if present. + /// + public byte[] ValueByteArray => _value.Binary.ToByteArray(); + + /// + /// The string value of this message, if present. + /// + public string ValueString => _value.Text; + } + + /// + public class Error : TopicMessage, IError + { + private readonly SdkException _error; + + /// + public Error(SdkException error) + { + _error = error; + } + + /// + public SdkException InnerException => _error; + + /// + public MomentoErrorCode ErrorCode => _error.ErrorCode; + + /// + public string Message => $"{_error.MessageWrapper}: {_error.Message}"; + + /// + public override string ToString() + { + return $"{base.ToString()}: {this.Message}"; + } + + } +} \ No newline at end of file diff --git a/src/Momento.Sdk/Responses/TopicPublishResponse.cs b/src/Momento.Sdk/Responses/TopicPublishResponse.cs new file mode 100644 index 00000000..2f0a89a7 --- /dev/null +++ b/src/Momento.Sdk/Responses/TopicPublishResponse.cs @@ -0,0 +1,62 @@ +using Momento.Sdk.Exceptions; + +namespace Momento.Sdk.Responses; + +/// +/// Parent response type for a topic publish request. The +/// response object is resolved to a type-safe object of one of +/// the following subtypes: +/// +/// TopicPublishResponse.Success +/// TopicPublishResponse.Error +/// +/// Pattern matching can be used to operate on the appropriate subtype. +/// For example: +/// +/// if (response is TopicPublishResponse.Success successResponse) +/// { +/// // handle success as appropriate +/// } +/// else if (response is TopicPublishResponse.Error errorResponse) +/// { +/// // handle error as appropriate +/// } +/// else +/// { +/// // handle unexpected response +/// } +/// +/// +public abstract class TopicPublishResponse +{ + /// + public class Success : TopicPublishResponse {} + + /// + public class Error : TopicPublishResponse, IError + { + private readonly SdkException _error; + + /// + public Error(SdkException error) + { + _error = error; + } + + /// + public SdkException InnerException => _error; + + /// + public MomentoErrorCode ErrorCode => _error.ErrorCode; + + /// + public string Message => $"{_error.MessageWrapper}: {_error.Message}"; + + /// + public override string ToString() + { + return $"{base.ToString()}: {this.Message}"; + } + + } +} \ No newline at end of file diff --git a/src/Momento.Sdk/Responses/TopicSubscribeResponse.cs b/src/Momento.Sdk/Responses/TopicSubscribeResponse.cs new file mode 100644 index 00000000..c9de66c7 --- /dev/null +++ b/src/Momento.Sdk/Responses/TopicSubscribeResponse.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Momento.Sdk.Exceptions; + +namespace Momento.Sdk.Responses; + +/// +/// Parent response type for a topic subscribe request. The +/// response object is resolved to a type-safe object of one of +/// the following subtypes: +/// +/// TopicSubscribeResponse.Subscription +/// TopicSubscribeResponse.Error +/// +/// Pattern matching can be used to operate on the appropriate subtype. +/// For example: +/// +/// if (response is TopicSubscribeResponse.Subscription subscription) +/// { +/// await foreach (var item in subscription.WithCancellation(ct)) +/// { +/// // iterate through the messages +/// } +/// } +/// else if (response is TopicSubscribeResponse.Error error) +/// { +/// // handle error as appropriate +/// } +/// else +/// { +/// // handle unexpected response +/// } +/// +/// +public abstract class TopicSubscribeResponse +{ + /// + /// A subscription to a Momento topic. As an IAsyncEnumerable, it can be iterated over to read messages from the + /// topic. The iterator will return a TopicMessage representing a message or error from the stream, or null if the + /// stream is closed. + /// + public class Subscription : TopicSubscribeResponse, IDisposable, IAsyncEnumerable + { + private readonly Func> _moveNextFunction; + private readonly Action _disposalAction; + + /// + /// Constructs a Subscription with a wrapped topic iterator and an action to dispose of it. + /// + public Subscription(Func> moveNextFunction, Action disposalAction) + { + _moveNextFunction = moveNextFunction; + _disposalAction = disposalAction; + } + + /// + /// Gets the enumerator for this topic. This subscription represents a single view on a topic, so multiple + /// enumerators will interfere with each other. + /// + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new TopicMessageEnumerator(_moveNextFunction, _disposalAction, cancellationToken); + } + + + /// + /// Unsubscribe from this topic. + /// + public void Dispose() + { + _disposalAction.Invoke(); + } + } + + private class TopicMessageEnumerator : IAsyncEnumerator + { + private readonly Func> _moveNextFunction; + private readonly Action _disposalAction; + private readonly CancellationToken _cancellationToken; + + public TopicMessageEnumerator(Func> moveNextFunction, Action disposalAction, CancellationToken cancellationToken) + { + _moveNextFunction = moveNextFunction; + _disposalAction = disposalAction; + _cancellationToken = cancellationToken; + } + + public TopicMessage? Current { get; private set; } + + public async ValueTask MoveNextAsync() + { + if (_cancellationToken.IsCancellationRequested) + { + Current = null; + return false; + } + + var nextMessage = await _moveNextFunction.Invoke(_cancellationToken); + switch (nextMessage) + { + case TopicMessage.Item: + Current = nextMessage; + return true; + case TopicMessage.Error: + Current = nextMessage; + return true; + default: + Current = null; + return false; + } + } + + public ValueTask DisposeAsync() + { + _disposalAction.Invoke(); + + return new ValueTask(); + } + } + + /// + public class Error : TopicSubscribeResponse, IError + { + private readonly SdkException _error; + + /// + public Error(SdkException error) + { + _error = error; + } + + /// + public SdkException InnerException => _error; + + /// + public MomentoErrorCode ErrorCode => _error.ErrorCode; + + /// + public string Message => $"{_error.MessageWrapper}: {_error.Message}"; + + /// + public override string ToString() + { + return $"{base.ToString()}: {this.Message}"; + } + } +} \ No newline at end of file diff --git a/src/Momento.Sdk/TopicClient.cs b/src/Momento.Sdk/TopicClient.cs new file mode 100644 index 00000000..deebe281 --- /dev/null +++ b/src/Momento.Sdk/TopicClient.cs @@ -0,0 +1,81 @@ +using System; +using System.Threading.Tasks; +using Momento.Sdk.Auth; +using Momento.Sdk.Config; +using Momento.Sdk.Exceptions; +using Momento.Sdk.Internal; +using Momento.Sdk.Responses; + +namespace Momento.Sdk; + +/// +/// Client to perform operations against Momento topics. +/// +public class TopicClient : ITopicClient +{ + private readonly ScsTopicClient scsTopicClient; + + + /// + /// Client to perform operations against Momento topics. + /// + /// Configuration to use for the transport, retries, middlewares. See for out-of-the-box configuration choices, eg + /// Momento auth provider. + public TopicClient(IConfiguration config, ICredentialProvider authProvider) + { + scsTopicClient = new ScsTopicClient(config, authProvider.AuthToken, authProvider.CacheEndpoint); + } + + /// + public async Task PublishAsync(string cacheName, string topicName, byte[] value) + { + try + { + Utils.ArgumentNotNull(cacheName, nameof(cacheName)); + Utils.ArgumentNotNull(topicName, nameof(topicName)); + Utils.ArgumentNotNull(value, nameof(value)); + } + catch (Exception e) + { + return new TopicPublishResponse.Error(new InvalidArgumentException(e.Message)); + } + return await scsTopicClient.Publish(cacheName, topicName, value); + } + + /// + public async Task PublishAsync(string cacheName, string topicName, string value) + { + try + { + Utils.ArgumentNotNull(cacheName, nameof(cacheName)); + Utils.ArgumentNotNull(topicName, nameof(topicName)); + Utils.ArgumentNotNull(value, nameof(value)); + } + catch (Exception e) + { + return new TopicPublishResponse.Error(new InvalidArgumentException(e.Message)); + } + return await scsTopicClient.Publish(cacheName, topicName, value); + } + + /// + public async Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null) + { + try + { + Utils.ArgumentNotNull(cacheName, nameof(cacheName)); + Utils.ArgumentNotNull(topicName, nameof(topicName)); + } + catch (Exception e) + { + return new TopicSubscribeResponse.Error(new InvalidArgumentException(e.Message)); + } + return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber); + } + + /// + public void Dispose() + { + scsTopicClient.Dispose(); + } +} \ No newline at end of file diff --git a/tests/Integration/Momento.Sdk.Tests/Fixtures.cs b/tests/Integration/Momento.Sdk.Tests/Fixtures.cs index 3b4d8de5..fb0dba63 100644 --- a/tests/Integration/Momento.Sdk.Tests/Fixtures.cs +++ b/tests/Integration/Momento.Sdk.Tests/Fixtures.cs @@ -44,6 +44,34 @@ public void Dispose() } } +public class TopicClientFixture : IDisposable +{ + public ITopicClient Client { get; private set; } + public ICredentialProvider AuthProvider { get; private set; } + + public TopicClientFixture() + { + AuthProvider = new EnvMomentoTokenProvider("TEST_AUTH_TOKEN"); + Client = new TopicClient(Configurations.Laptop.Latest(LoggerFactory.Create(builder => + { + builder.AddSimpleConsole(options => + { + options.IncludeScopes = true; + options.SingleLine = true; + options.TimestampFormat = "hh:mm:ss "; + }); + builder.AddFilter("Grpc.Net.Client", LogLevel.Error); + builder.SetMinimumLevel(LogLevel.Information); + })), + AuthProvider); + } + + public void Dispose() + { + Client.Dispose(); + } +} + /// /// Register the fixture in xUnit. /// diff --git a/tests/Integration/Momento.Sdk.Tests/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/TopicTest.cs new file mode 100644 index 00000000..c4af2f03 --- /dev/null +++ b/tests/Integration/Momento.Sdk.Tests/TopicTest.cs @@ -0,0 +1,236 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit.Abstractions; + +// ReSharper disable once CheckNamespace +namespace Momento.Sdk.Tests; + +public class TopicTest : IClassFixture, IClassFixture +{ + private readonly ITestOutputHelper testOutputHelper; + private readonly string cacheName; + private readonly ITopicClient topicClient; + + public TopicTest(CacheClientFixture cacheFixture, TopicClientFixture topicFixture, + ITestOutputHelper testOutputHelper) + { + this.testOutputHelper = testOutputHelper; + topicClient = topicFixture.Client; + cacheName = cacheFixture.CacheName; + } + + [Theory] + [InlineData(null, "topic")] + [InlineData("cache", null)] + public async Task PublishAsync_NullChecksByteArray_IsError(string badCacheName, string badTopicName) + { + var response = await topicClient.PublishAsync(badCacheName, badTopicName, Array.Empty()); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.INVALID_ARGUMENT_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Fact] + public async Task PublishAsync_PublishNullByteArray_IsError() + { + var response = await topicClient.PublishAsync(cacheName, "topic", (byte[])null!); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.INVALID_ARGUMENT_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Fact] + public async Task PublishAsync_BadCacheNameByteArray_IsError() + { + var response = await topicClient.PublishAsync("fake-" + cacheName, "topic", Array.Empty()); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.NOT_FOUND_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Theory] + [InlineData(null, "topic")] + [InlineData("cache", null)] + public async Task PublishAsync_NullChecksString_IsError(string badCacheName, string badTopicName) + { + var response = await topicClient.PublishAsync(badCacheName, badTopicName, "value"); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.INVALID_ARGUMENT_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Fact] + public async Task PublishAsync_PublishNullString_IsError() + { + var response = await topicClient.PublishAsync(cacheName, "topic", (string)null!); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.INVALID_ARGUMENT_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Fact] + public async Task PublishAsync_BadCacheNameString_IsError() + { + var response = await topicClient.PublishAsync("fake-" + cacheName, "topic", "value"); + Assert.True(response is TopicPublishResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.NOT_FOUND_ERROR, ((TopicPublishResponse.Error)response).ErrorCode); + } + + [Theory] + [InlineData(null, "topic")] + [InlineData("cache", null)] + public async Task SubscribeAsync_NullChecks_IsError(string badCacheName, string badTopicName) + { + var response = await topicClient.SubscribeAsync(badCacheName, badTopicName); + Assert.True(response is TopicSubscribeResponse.Error, $"Unexpected response: {response}"); + Assert.Equal(MomentoErrorCode.INVALID_ARGUMENT_ERROR, ((TopicSubscribeResponse.Error)response).ErrorCode); + } + + [Fact(Timeout = 5000)] + public async Task PublishAndSubscribe_ByteArray_Succeeds() + { + const string topicName = "topic_bytes"; + var skipMessage = new byte[] { 0x00 }; + var valuesToSend = new List + { + new byte[] { 0x01 }, + new byte[] { 0x02 }, + new byte[] { 0x03 }, + new byte[] { 0x04 }, + new byte[] { 0x05 } + }; + + var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); + Assert.True(subscribeResponse is TopicSubscribeResponse.Subscription, + $"Unexpected response: {subscribeResponse}"); + var subscription = (TopicSubscribeResponse.Subscription)subscribeResponse; + + var testTask = Task.Run(async () => + { + var messageCount = 0; + await foreach (var message in subscription) + { + Assert.NotNull(message); + Assert.True(message is TopicMessage.Item, $"Unexpected message: {message}"); + if (((TopicMessage.Item)message).ValueByteArray!.SequenceEqual(skipMessage)) continue; + + Assert.Equal(valuesToSend[messageCount], ((TopicMessage.Item)message).ValueByteArray); + + messageCount++; + if (messageCount == valuesToSend.Count) + { + break; + } + } + + subscription.Dispose(); + return messageCount; + }); + + // Send a few values to skip over to ensure the subscription received the messages under test + for (var i = 0; i < 5; i++) + { + await topicClient.PublishAsync(cacheName, topicName, skipMessage); + Thread.Sleep(100); + } + + foreach (var value in valuesToSend) + { + var publishResponse = await topicClient.PublishAsync(cacheName, topicName, value); + Assert.True(publishResponse is TopicPublishResponse.Success, $"Unexpected response: {publishResponse}"); + Thread.Sleep(100); + } + foreach (var value in valuesToSend) + { + var publishResponse = await topicClient.PublishAsync(cacheName, topicName, value); + Assert.True(publishResponse is TopicPublishResponse.Success, $"Unexpected response: {publishResponse}"); + Thread.Sleep(100); + } + + Assert.Equal(valuesToSend.Count, await testTask); + } + + [Fact(Timeout = 5000)] + public async Task PublishAndSubscribe_String_Succeeds() + { + const string topicName = "topic_string"; + var skipMessage = "skip"; + var valuesToSend = new List + { + "one", + "two", + "three", + "four", + "five" + }; + + var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); + Assert.True(subscribeResponse is TopicSubscribeResponse.Subscription, + $"Unexpected response: {subscribeResponse}"); + var subscription = (TopicSubscribeResponse.Subscription)subscribeResponse; + + var testTask = Task.Run(async () => + { + var messageCount = 0; + await foreach (var message in subscription) + { + Assert.NotNull(message); + Assert.True(message is TopicMessage.Item, $"Unexpected message: {message}"); + if (((TopicMessage.Item)message).ValueString!.Equals(skipMessage)) continue; + + Assert.Equal(valuesToSend[messageCount], ((TopicMessage.Item)message).ValueString); + + messageCount++; + if (messageCount == valuesToSend.Count) + { + break; + } + } + + subscription.Dispose(); + return messageCount; + }); + + // Send a few values to skip over to ensure the subscription received the messages under test + for (var i = 0; i < 5; i++) + { + await topicClient.PublishAsync(cacheName, topicName, skipMessage); + Thread.Sleep(100); + } + + foreach (var value in valuesToSend) + { + var publishResponse = await topicClient.PublishAsync(cacheName, topicName, value); + Assert.True(publishResponse is TopicPublishResponse.Success, $"Unexpected response: {publishResponse}"); + Thread.Sleep(100); + } + + Assert.Equal(valuesToSend.Count, await testTask); + } + + [Fact(Timeout = 5000)] + public async Task Subscribe_EnumerateClosed_Succeeds() + { + const string topicName = "topic_closed"; + const string messageValue = "value"; + + using var cts = new CancellationTokenSource(); + + var subscribeResponse = await topicClient.SubscribeAsync(cacheName, topicName); + Assert.True(subscribeResponse is TopicSubscribeResponse.Subscription, + $"Unexpected response: {subscribeResponse}"); + var subscription = ((TopicSubscribeResponse.Subscription)subscribeResponse).WithCancellation(cts.Token); + testOutputHelper.WriteLine("subscribed"); + + var enumerator = subscription.GetAsyncEnumerator(); + Assert.Null(enumerator.Current); + testOutputHelper.WriteLine("enumerator gotten"); + + var publishResponse = await topicClient.PublishAsync(cacheName, topicName, messageValue); + Assert.True(publishResponse is TopicPublishResponse.Success, $"Unexpected response: {publishResponse}"); + testOutputHelper.WriteLine("message published"); + + cts.Cancel(); + + Assert.False(await enumerator.MoveNextAsync()); + + Assert.Null(enumerator.Current); + } +} \ No newline at end of file