Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add topics #468

Merged
merged 27 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e10ae02
feat: add topics
nand4011 Sep 1, 2023
4f4cb06
split message types
nand4011 Sep 1, 2023
5e1f623
Return false from the iterator when the stream is cancelled.
nand4011 Sep 1, 2023
f4623d1
Add print statements to try and identify hanging test.
nand4011 Sep 1, 2023
b728deb
Only enable topics for .NET Standard 2.0 or greater
nand4011 Sep 1, 2023
117257e
Comment out tests for troubleshooting and replace sleep with delay
nand4011 Sep 1, 2023
fc66489
Manually synchronize the test and add more print statements
nand4011 Sep 1, 2023
be310b9
Temporarily remove subscribing from test.
nand4011 Sep 2, 2023
541b0e2
Make the test as simple as possible.
nand4011 Sep 2, 2023
68b1cf3
Modify the test to only create a task that waits.
nand4011 Sep 2, 2023
34180b8
Modify the test to publish messages.
nand4011 Sep 2, 2023
8382d43
Remake the test to be explicitly threaded
nand4011 Sep 2, 2023
9ad31e5
Remake publish/subscribe tests to be explicitly threaded.
nand4011 Sep 2, 2023
264d46f
Make message production thread in tests.
nand4011 Sep 2, 2023
d7f8738
re-add the single test filter
nand4011 Sep 2, 2023
eedc1ad
Troubleshoot test threading.
nand4011 Sep 5, 2023
f710074
Remove test filter from build.yaml
nand4011 Sep 5, 2023
8252472
add and use a separate topic configuration.
nand4011 Sep 5, 2023
020c364
detailed logging
kvcache Sep 5, 2023
a8044e5
switch to normal verbosity, & cancel even if something goes wrong
kvcache Sep 5, 2023
f0e603c
xunit uses a different way to set up verbosity
kvcache Sep 5, 2023
b56bfb4
naming, threading, test case coverage
kvcache Sep 5, 2023
9182598
Temporarily disable one of the topic tests.
nand4011 Sep 5, 2023
0180c5b
leave continuation associated with original async context
kvcache Sep 5, 2023
9e49d9f
disable test parallelism
nand4011 Sep 5, 2023
0b81e15
Add ValueString and ValueByteArray methods to TopicMessage.
nand4011 Sep 5, 2023
052d9cd
Multiple pr fixes
nand4011 Sep 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
include:
- os: ubuntu-latest
target-framework: net6.0
- os: windows-latest
target-framework: net6.0
- os: windows-latest
target-framework: net461
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -48,10 +50,10 @@ jobs:
run: dotnet build

- name: Unit Test
run: dotnet test -f ${{ matrix.target-framework }} tests/Unit/Momento.Sdk.Tests
run: dotnet test --logger "console;verbosity=detailed" -f ${{ matrix.target-framework }} tests/Unit/Momento.Sdk.Tests

- name: Integration Test
run: dotnet test -f ${{ matrix.target-framework }} tests/Integration/Momento.Sdk.Tests
run: dotnet test --logger "console;verbosity=detailed" -f ${{ matrix.target-framework }} tests/Integration/Momento.Sdk.Tests

build_examples:
runs-on: ubuntu-latest
Expand Down
23 changes: 23 additions & 0 deletions src/Momento.Sdk/Config/ITopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;


/// <summary>
/// Contract for Topic SDK configurables.
/// </summary>
public interface ITopicConfiguration
{
/// <inheritdoc cref="Microsoft.Extensions.Logging.ILoggerFactory" />
public ILoggerFactory LoggerFactory { get; }
/// <inheritdoc cref="Momento.Sdk.Config.Transport.ITransportStrategy" />
public ITopicTransportStrategy TransportStrategy { get; }

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified transport strategy.
/// </summary>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <returns>Configuration object with custom transport strategy provided</returns>
public ITopicConfiguration WithTransportStrategy(ITopicTransportStrategy transportStrategy);
}
50 changes: 50 additions & 0 deletions src/Momento.Sdk/Config/TopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;

/// <inheritdoc cref="Momento.Sdk.Config.ITopicConfiguration" />
public class TopicConfiguration : ITopicConfiguration
{
/// <inheritdoc />
public ILoggerFactory LoggerFactory { get; }

/// <inheritdoc />
public ITopicTransportStrategy TransportStrategy { get; }

/// <summary>
/// Create a new instance of a Topic Configuration object with provided arguments: <see cref="Momento.Sdk.Config.ITopicConfiguration.TransportStrategy"/>, and <see cref="Momento.Sdk.Config.ITopicConfiguration.LoggerFactory"/>
/// </summary>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <param name="loggerFactory">This is responsible for configuring logging.</param>
public TopicConfiguration(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy)
{
LoggerFactory = loggerFactory;
TransportStrategy = transportStrategy;
}

/// <inheritdoc />
public ITopicConfiguration WithTransportStrategy(ITopicTransportStrategy transportStrategy)
{
return new TopicConfiguration(LoggerFactory, transportStrategy);
}

/// <inheritdoc />
public override bool Equals(object obj)
{
if ((obj == null) || !this.GetType().Equals(obj.GetType()))
{
return false;
}

var other = (Configuration)obj;
return TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory);
}

/// <inheritdoc />
public override int GetHashCode()
{
return base.GetHashCode();
}
}
73 changes: 73 additions & 0 deletions src/Momento.Sdk/Config/TopicConfigurations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Config.Transport;

namespace Momento.Sdk.Config;

/// <summary>
/// Provide pre-built topic configurations.
/// </summary>
public class TopicConfigurations
{
/// <summary>
/// Laptop config provides defaults suitable for a medium-to-high-latency environment. Permissive timeouts, retries, and
/// relaxed latency and throughput targets.
/// </summary>
public class Laptop : TopicConfiguration
{
private Laptop(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy) : base(loggerFactory,
transportStrategy)
{
}

/// <summary>
/// Provides the latest recommended configuration for a Laptop environment.
/// </summary>
/// <remark>
/// This configuration may change in future releases to take advantage of
/// improvements we identify for default configurations.
/// </remark>
/// <param name="loggerFactory"></param>
/// <returns></returns>
public static ITopicConfiguration latest(ILoggerFactory? loggerFactory = null)
{
var finalLoggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
ITopicTransportStrategy transportStrategy = new StaticTopicTransportStrategy(
loggerFactory: finalLoggerFactory,
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Laptop(finalLoggerFactory, transportStrategy);
}
}

/// <summary>
/// Mobile config provides defaults suitable for a medium-to-high-latency mobile environment.
/// </summary>
public class Mobile : TopicConfiguration
{
private Mobile(ILoggerFactory loggerFactory, ITopicTransportStrategy transportStrategy) : base(loggerFactory,
transportStrategy)
{
}

/// <summary>
/// Provides the latest recommended configuration for a Mobile environment.
/// </summary>
/// <remark>
/// This configuration may change in future releases to take advantage of
/// improvements we identify for default configurations.
/// </remark>
/// <param name="loggerFactory"></param>
/// <returns></returns>
public static ITopicConfiguration latest(ILoggerFactory? loggerFactory = null)
{
var finalLoggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
ITopicTransportStrategy transportStrategy = new StaticTopicTransportStrategy(
loggerFactory: finalLoggerFactory,
grpcConfig: new StaticGrpcConfiguration(deadline: TimeSpan.FromMilliseconds(15000))
);
return new Mobile(finalLoggerFactory, transportStrategy);
}
}
}
29 changes: 29 additions & 0 deletions src/Momento.Sdk/Config/Transport/ITopicTransportStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;

namespace Momento.Sdk.Config.Transport;

/// <summary>
/// This is responsible for configuring network tunables for topics.
/// </summary>
public interface ITopicTransportStrategy
{
/// <summary>
/// Configures the low-level gRPC settings for the Momento Topic client's communication
/// with the Momento server.
/// </summary>
public IGrpcConfiguration GrpcConfig { get; }

/// <summary>
/// Copy constructor to update the gRPC configuration
/// </summary>
/// <param name="grpcConfig"></param>
/// <returns>A new ITransportStrategy with the specified grpcConfig</returns>
public ITopicTransportStrategy WithGrpcConfig(IGrpcConfiguration grpcConfig);

/// <summary>
/// Copy constructor to update the client timeout for publishing
/// </summary>
/// <param name="clientTimeout"></param>
/// <returns>A new ITransportStrategy with the specified client timeout</returns>
public ITopicTransportStrategy WithClientTimeout(TimeSpan clientTimeout);
}
64 changes: 64 additions & 0 deletions src/Momento.Sdk/Config/Transport/StaticTopicTransportStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using Microsoft.Extensions.Logging;

namespace Momento.Sdk.Config.Transport;

/// <summary>
/// The simplest way to configure the transport layer for the Momento Topic client.
/// Provides static values for the gRPC configuration.
/// </summary>
public class StaticTopicTransportStrategy : ITopicTransportStrategy
{
private readonly ILoggerFactory _loggerFactory;

/// <inheritdoc />
public IGrpcConfiguration GrpcConfig { get; }

/// <summary>
///
/// </summary>
/// <param name="loggerFactory"></param>
/// <param name="grpcConfig">Configures how Momento Topic client interacts with the Momento service via gRPC</param>
public StaticTopicTransportStrategy(ILoggerFactory loggerFactory, IGrpcConfiguration grpcConfig)
{
_loggerFactory = loggerFactory;
GrpcConfig = grpcConfig;
}

/// <inheritdoc/>
public ITopicTransportStrategy WithGrpcConfig(IGrpcConfiguration grpcConfig)
{
return new StaticTopicTransportStrategy(_loggerFactory, grpcConfig);
}

/// <inheritdoc/>
public ITopicTransportStrategy WithClientTimeout(TimeSpan clientTimeout)
{
return new StaticTopicTransportStrategy(_loggerFactory, GrpcConfig.WithDeadline(clientTimeout));
}

/// <summary>
/// Test equality by value.
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
if ((obj == null) || !this.GetType().Equals(obj.GetType()))
{
return false;
}

var other = (StaticTransportStrategy)obj;
return GrpcConfig.Equals(other.GrpcConfig);
}

/// <summary>
/// Trivial hash code implementation.
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
return base.GetHashCode();
}
}
69 changes: 69 additions & 0 deletions src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
#if NETSTANDARD2_0_OR_GREATER


using System.Threading.Tasks;
using Momento.Sdk.Responses;

namespace Momento.Sdk;

/// <summary>
/// Minimum viable functionality of a topic client.
/// </summary>
public interface ITopicClient : IDisposable
{
/// <summary>
/// Publish a value to a topic in a cache.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="value">The value to be published.</param>
/// <returns>
/// Task object representing the result of the publish operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicPublishResponse.Success</description></item>
/// <item><description>TopicPublishResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicPublishResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, byte[] value);

/// <inheritdoc cref="PublishAsync(string, string, byte[])"/>
public Task<TopicPublishResponse> PublishAsync(string cacheName, string topicName, string value);

/// <summary>
/// Subscribe to a topic. The returned value can be used to iterate over newly published messages on the topic.
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// If provided, the client will attempt to start the stream from that sequence number.</param>
/// <returns>
/// Task object representing the result of the subscribe operation. The
/// response object is resolved to a type-safe object of one of
/// the following subtypes:
/// <list type="bullet">
/// <item><description>TopicSubscribeResponse.Subscription</description></item>
/// <item><description>TopicSubscribeResponse.Error</description></item>
/// </list>
/// Pattern matching can be used to operate on the appropriate subtype.
/// For example:
/// <code>
/// if (response is TopicSubscribeResponse.Error errorResponse)
/// {
/// // handle error as appropriate
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
}
#endif
Loading
Loading