diff --git a/Core.ServiceMesh.Abstractions/DeliverPolicy.cs b/Core.ServiceMesh.Abstractions/DeliverPolicy.cs index 62f97dd..e139689 100644 --- a/Core.ServiceMesh.Abstractions/DeliverPolicy.cs +++ b/Core.ServiceMesh.Abstractions/DeliverPolicy.cs @@ -3,24 +3,25 @@ public enum DeliverPolicy { /// - /// Default policy. Start receiving from the earliest available message in the stream. + /// Default policy. Start receiving from the earliest available message in the stream. /// All, /// - /// Start with the last message added to the stream, or the last message matching the consumer's filter subject if defined. + /// Start with the last message added to the stream, or the last message matching the consumer's filter subject if + /// defined. /// Last, /// - /// Start with the latest message for each filtered subject currently in the stream. + /// Start with the latest message for each filtered subject currently in the stream. /// LastPerSubject, /// - /// Start receiving messages created after the consumer was created. + /// Start receiving messages created after the consumer was created. /// - New, + New /*/// /// Start at the first message with the specified sequence number. The consumer must specify OptStartSeq defining the sequence number. diff --git a/Core.ServiceMesh.Abstractions/DurableConsumerAttribute.cs b/Core.ServiceMesh.Abstractions/DurableConsumerAttribute.cs index da86be4..aa6b5a9 100644 --- a/Core.ServiceMesh.Abstractions/DurableConsumerAttribute.cs +++ b/Core.ServiceMesh.Abstractions/DurableConsumerAttribute.cs @@ -4,38 +4,46 @@ public class DurableConsumerAttribute(string name) : Attribute { /// - /// Unique name for the durable consumer + /// Unique name for the durable consumer /// public string Name => name; /// - /// Stream name to subscribe to. Null means container default stream + /// Stream name to subscribe to. Null means container default stream /// public string? Stream { get; init; } /// - /// The maximum number of times a specific message delivery will be attempted. Applies to any message that is re-sent due to acknowledgment policy (i.e., due to a negative acknowledgment or no acknowledgment sent by the client). The default is -1 (redeliver until acknowledged). Messages that have reached the maximum delivery count will stay in the stream. + /// The maximum number of times a specific message delivery will be attempted. Applies to any message that is re-sent + /// due to acknowledgment policy (i.e., due to a negative acknowledgment or no acknowledgment sent by the client). The + /// default is -1 (redeliver until acknowledged). Messages that have reached the maximum delivery count will stay in + /// the stream. /// public long MaxDeliver { get; init; } = -1; /// - /// Defines the maximum number of messages, without acknowledgment, that can be outstanding. Once this limit is reached, message delivery will be suspended. This limit applies across all of the consumer's bound subscriptions. A value of -1 means there can be any number of pending acknowledgments (i.e., no flow control). The default is 1000. + /// Defines the maximum number of messages, without acknowledgment, that can be outstanding. Once this limit is + /// reached, message delivery will be suspended. This limit applies across all of the consumer's bound subscriptions. A + /// value of -1 means there can be any number of pending acknowledgments (i.e., no flow control). The default is 1000. /// public long MaxAckPending { get; init; } = 1000; /// - /// The duration (in seconds) that the server will wait for an acknowledgment for any individual message once it has been delivered to a consumer. If an acknowledgment is not received in time, the message will be redelivered. + /// The duration (in seconds) that the server will wait for an acknowledgment for any individual message once it has + /// been delivered to a consumer. If an acknowledgment is not received in time, the message will be redelivered. /// - public long AckWait { get; set; } = 60*5; + public long AckWait { get; set; } = 60 * 5; /// - /// The point in the stream from which to receive messages - /// Cannot be updated. + /// The point in the stream from which to receive messages + /// Cannot be updated. /// public DeliverPolicy DeliverPolicy { get; init; } = DeliverPolicy.All; /// - /// A sequence of durations (in seconds) controlling the redelivery of messages on nak or acknowledgment timeout. Overrides ackWait. The sequence length must be less than or equal to MaxDelivery. If backoff is not set, a nak will result in immediate redelivery. + /// A sequence of durations (in seconds) controlling the redelivery of messages on nak or acknowledgment timeout. + /// Overrides ackWait. The sequence length must be less than or equal to MaxDelivery. If backoff is not set, a nak will + /// result in immediate redelivery. /// public long[] Backoff { get; init; } = []; } \ No newline at end of file diff --git a/Core.ServiceMesh.Abstractions/IServiceMesh.cs b/Core.ServiceMesh.Abstractions/IServiceMesh.cs index daa11c1..f3b3dea 100644 --- a/Core.ServiceMesh.Abstractions/IServiceMesh.cs +++ b/Core.ServiceMesh.Abstractions/IServiceMesh.cs @@ -1,30 +1,26 @@ -using System.Reflection; - -namespace Core.ServiceMesh.Abstractions; +namespace Core.ServiceMesh.Abstractions; public interface IServiceMesh { /// - /// Create remote proxy for service interface. + /// Create remote proxy for service interface. /// /// service interface /// proxy service implementation T CreateProxy() where T : class; /// - /// Publish message and await confirmation. - /// Requires at least one consumer or timeout. + /// Publish message and await confirmation. + /// Requires at least one consumer or timeout. /// ValueTask PublishAsync(object message, int retry = 3, TimeSpan? retryWait = null, string? id = null); /// - /// Send message and ignore confirmation + /// Send message and ignore confirmation /// ValueTask SendAsync(object message); ValueTask RequestAsync(string subject, object[] args, Type[] generics); ValueTask RequestAsync(string subject, object[] args, Type[] generics); IAsyncEnumerable StreamAsync(string subject, object[] args, Type[] generics); - - } \ No newline at end of file diff --git a/Core.ServiceMesh.Abstractions/ServiceMeshActivity.cs b/Core.ServiceMesh.Abstractions/ServiceMeshActivity.cs index bc6cc8e..9411bbc 100644 --- a/Core.ServiceMesh.Abstractions/ServiceMeshActivity.cs +++ b/Core.ServiceMesh.Abstractions/ServiceMeshActivity.cs @@ -1,4 +1,5 @@ using System.Diagnostics; + namespace Core.ServiceMesh.Abstractions; public static class ServiceMeshActivity diff --git a/Core.ServiceMesh.Abstractions/ServiceMeshAttribute.cs b/Core.ServiceMesh.Abstractions/ServiceMeshAttribute.cs index 605e2a2..d65173a 100644 --- a/Core.ServiceMesh.Abstractions/ServiceMeshAttribute.cs +++ b/Core.ServiceMesh.Abstractions/ServiceMeshAttribute.cs @@ -4,9 +4,9 @@ public class ServiceMeshAttribute(string name) : Attribute { /// - /// Unique service name. - /// May also include versioning information - /// e.g. SampleServiceV2 + /// Unique service name. + /// May also include versioning information + /// e.g. SampleServiceV2 /// public string Name => name; } \ No newline at end of file diff --git a/Core.ServiceMesh.SourceGen.Tests/Core.ServiceMesh.SourceGen.Tests.csproj b/Core.ServiceMesh.SourceGen.Tests/Core.ServiceMesh.SourceGen.Tests.csproj index c7f878b..5cd0287 100644 --- a/Core.ServiceMesh.SourceGen.Tests/Core.ServiceMesh.SourceGen.Tests.csproj +++ b/Core.ServiceMesh.SourceGen.Tests/Core.ServiceMesh.SourceGen.Tests.csproj @@ -14,7 +14,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + all diff --git a/Core.ServiceMesh.SourceGen.Tests/UnitTest1.cs b/Core.ServiceMesh.SourceGen.Tests/UnitTest1.cs index 10bdfdc..4f0b170 100644 --- a/Core.ServiceMesh.SourceGen.Tests/UnitTest1.cs +++ b/Core.ServiceMesh.SourceGen.Tests/UnitTest1.cs @@ -14,7 +14,7 @@ public Task ProxyInterface() using System.Threading.Tasks; using System.Numerics; using Core.ServiceMesh.Abstractions; - + namespace SampleApp; [ServiceMesh("someservice")] @@ -25,7 +25,7 @@ public interface ISomeService ValueTask C(T a, T b) where T : INumber; IAsyncEnumerable D(string d); } - + [ServiceMesh("someservice")] public class SomeService : ISomeService { @@ -55,7 +55,7 @@ public async IAsyncEnumerable D(string d) public static class TestHelper { - public async static Task VerifySourceGen(string source) + public static async Task VerifySourceGen(string source) { var syntaxTree = CSharpSyntaxTree.ParseText(source); diff --git a/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray.cs b/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray.cs index 92bc52c..aa7fad8 100644 --- a/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray.cs +++ b/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray.cs @@ -1,14 +1,19 @@ using System; using System.Collections.Generic; -namespace Core.ServiceMesh.SourceGen.Core +namespace Core.ServiceMesh.SourceGen.Core; + +internal static class ImmutableEquatableArray { - internal static class ImmutableEquatableArray + public static ImmutableEquatableArray Empty() + where T : IEquatable { - public static ImmutableEquatableArray Empty() - where T : IEquatable => ImmutableEquatableArray.Empty; + return ImmutableEquatableArray.Empty; + } - public static ImmutableEquatableArray ToImmutableEquatableArray(this IEnumerable values) - where T : IEquatable => values == null ? Empty() : new(values); + public static ImmutableEquatableArray ToImmutableEquatableArray(this IEnumerable values) + where T : IEquatable + { + return values == null ? Empty() : new ImmutableEquatableArray(values); } } \ No newline at end of file diff --git a/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray{T}.cs b/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray{T}.cs index c64e212..3c1a0f9 100644 --- a/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray{T}.cs +++ b/Core.ServiceMesh.SourceGen/Core/ImmutableEquatableArray{T}.cs @@ -3,76 +3,93 @@ using System.Collections.Generic; using System.Linq; -namespace Core.ServiceMesh.SourceGen.Core +namespace Core.ServiceMesh.SourceGen.Core; + +internal sealed class ImmutableEquatableArray : IEquatable>, IReadOnlyList + where T : IEquatable { - internal sealed class ImmutableEquatableArray : IEquatable>, IReadOnlyList - where T : IEquatable + private readonly T[] _values; + + public ImmutableEquatableArray(T[] values) { - public static ImmutableEquatableArray Empty { get; } = new(Array.Empty()); + _values = values; + } - private readonly T[] _values; - public T this[int index] => _values[index]; - public int Count => _values.Length; + public ImmutableEquatableArray(IEnumerable values) + { + _values = values.ToArray(); + } - public ImmutableEquatableArray(T[] values) => _values = values; + public static ImmutableEquatableArray Empty { get; } = new(Array.Empty()); - public ImmutableEquatableArray(IEnumerable values) => _values = values.ToArray(); + public bool Equals(ImmutableEquatableArray other) + { + return other != null && ((ReadOnlySpan)_values).SequenceEqual(other._values); + } - public bool Equals(ImmutableEquatableArray other) => - other != null && ((ReadOnlySpan)_values).SequenceEqual(other._values); + public T this[int index] => _values[index]; + public int Count => _values.Length; - public override bool Equals(object obj) => obj is ImmutableEquatableArray other && Equals(other); + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable)_values).GetEnumerator(); + } - public override int GetHashCode() - { - var hash = 0; - foreach (T value in _values) - { - hash = Combine(hash, value.GetHashCode()); - } + IEnumerator IEnumerable.GetEnumerator() + { + return _values.GetEnumerator(); + } - static int Combine(int h1, int h2) - { - // RyuJIT optimizes this to use the ROL instruction - // Related GitHub pull request: https://github.com/dotnet/coreclr/pull/1830 - uint rol5 = (uint)h1 << 5 | (uint)h1 >> 27; - return (int)rol5 + h1 ^ h2; - } + public override bool Equals(object obj) + { + return obj is ImmutableEquatableArray other && Equals(other); + } - return hash; + public override int GetHashCode() + { + var hash = 0; + foreach (var value in _values) hash = Combine(hash, value.GetHashCode()); + + static int Combine(int h1, int h2) + { + // RyuJIT optimizes this to use the ROL instruction + // Related GitHub pull request: https://github.com/dotnet/coreclr/pull/1830 + var rol5 = ((uint)h1 << 5) | ((uint)h1 >> 27); + return ((int)rol5 + h1) ^ h2; } - public Enumerator GetEnumerator() => new(_values); + return hash; + } - IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_values).GetEnumerator(); + public Enumerator GetEnumerator() + { + return new Enumerator(_values); + } - IEnumerator IEnumerable.GetEnumerator() => _values.GetEnumerator(); + public struct Enumerator + { + private readonly T[] _values; + private int _index; - public struct Enumerator + internal Enumerator(T[] values) { - private readonly T[] _values; - private int _index; + _values = values; + _index = -1; + } - internal Enumerator(T[] values) - { - _values = values; - _index = -1; - } + public bool MoveNext() + { + var newIndex = _index + 1; - public bool MoveNext() + if ((uint)newIndex < (uint)_values.Length) { - var newIndex = _index + 1; - - if ((uint)newIndex < (uint)_values.Length) - { - _index = newIndex; - return true; - } - - return false; + _index = newIndex; + return true; } - public readonly T Current => _values[_index]; + return false; } + + public readonly T Current => _values[_index]; } } \ No newline at end of file diff --git a/Core.ServiceMesh.SourceGen/Model/MethodDescription.cs b/Core.ServiceMesh.SourceGen/Model/MethodDescription.cs index 88e9045..b2f40b8 100644 --- a/Core.ServiceMesh.SourceGen/Model/MethodDescription.cs +++ b/Core.ServiceMesh.SourceGen/Model/MethodDescription.cs @@ -5,22 +5,23 @@ namespace Core.ServiceMesh.SourceGen.Model; internal readonly record struct MethodDescription { + public readonly ImmutableEquatableArray Constraints; + public readonly ImmutableEquatableArray Generics; public readonly string Name; + public readonly ImmutableEquatableArray ParameterNames; + public readonly ImmutableEquatableArray Parameters; public readonly string Return; public readonly ImmutableEquatableArray ReturnArguments; - public readonly ImmutableEquatableArray Parameters; - public readonly ImmutableEquatableArray ParameterNames; - public readonly ImmutableEquatableArray Generics; - public readonly ImmutableEquatableArray Constraints; - public MethodDescription(string name, string ret, List returnArgs, List parameter, List parameterNames, List generics, List constraints) + public MethodDescription(string name, string ret, List returnArgs, List parameter, + List parameterNames, List generics, List constraints) { Name = name; Return = ret; - ReturnArguments = new(returnArgs); - Parameters = new(parameter); - ParameterNames = new(parameterNames); - Generics = new(generics); - Constraints = new(constraints); + ReturnArguments = new ImmutableEquatableArray(returnArgs); + Parameters = new ImmutableEquatableArray(parameter); + ParameterNames = new ImmutableEquatableArray(parameterNames); + Generics = new ImmutableEquatableArray(generics); + Constraints = new ImmutableEquatableArray(constraints); } } \ No newline at end of file diff --git a/Core.ServiceMesh.SourceGen/Model/ServiceDescription.cs b/Core.ServiceMesh.SourceGen/Model/ServiceDescription.cs index 26247c4..ce7d55e 100644 --- a/Core.ServiceMesh.SourceGen/Model/ServiceDescription.cs +++ b/Core.ServiceMesh.SourceGen/Model/ServiceDescription.cs @@ -6,19 +6,20 @@ namespace Core.ServiceMesh.SourceGen.Model; internal readonly record struct ServiceDescription { public readonly string ClassName; - public readonly string Namespace; public readonly bool IsInterface; - public readonly string ServiceName; public readonly ImmutableEquatableArray Methods; + public readonly string Namespace; + public readonly string ServiceName; public readonly ImmutableEquatableArray Usings; - public ServiceDescription(bool isInterface, string className, string ns, string service, List methods, List usings) + public ServiceDescription(bool isInterface, string className, string ns, string service, + List methods, List usings) { IsInterface = isInterface; ClassName = className; Namespace = ns; ServiceName = service; - Methods = new(methods); - Usings = new(usings); + Methods = new ImmutableEquatableArray(methods); + Usings = new ImmutableEquatableArray(usings); } } \ No newline at end of file diff --git a/Core.ServiceMesh.SourceGen/ServiceMeshGenerator.cs b/Core.ServiceMesh.SourceGen/ServiceMeshGenerator.cs index 8d89daa..b4eac2e 100644 --- a/Core.ServiceMesh.SourceGen/ServiceMeshGenerator.cs +++ b/Core.ServiceMesh.SourceGen/ServiceMeshGenerator.cs @@ -70,26 +70,21 @@ private ServiceDescription Transform(GeneratorAttributeSyntaxContext ctx, Cancel List returnTypeArgs = []; if (x.ReturnType is INamedTypeSymbol namedReturnType) - { foreach (var arg in namedReturnType.TypeArguments) - { returnTypeArgs.Add(arg.ToDisplayString(SymbolDisplayFormat.MinimallyQualifiedFormat)); - } - } - return new MethodDescription(methodName, returnType, returnTypeArgs, parameters, paramNames, generics, constraints); + return new MethodDescription(methodName, returnType, returnTypeArgs, parameters, paramNames, generics, + constraints); }).ToList(), GetUsings(typeSymbol).ToList()); } private static void GenerateCode(SourceProductionContext context, ImmutableArray services) { foreach (var service in services) - { if (service.IsInterface) BuildRemoteProxy(context, service); else BuildTraceProxy(context, service); - } } private static void BuildRemoteProxy(SourceProductionContext context, ServiceDescription service) @@ -107,18 +102,20 @@ private static void BuildRemoteProxy(SourceProductionContext context, ServiceDes builder.AppendLine(); } - builder.AppendLine($"public sealed class {service.ClassName}RemoteProxy(IServiceMesh mesh) : {service.ClassName}"); + builder.AppendLine( + $"public sealed class {service.ClassName}RemoteProxy(IServiceMesh mesh) : {service.ClassName}"); builder.AppendLine("{"); foreach (var m in service.Methods) { builder.AppendLine(); var parameterEx = $"[{string.Join(", ", m.ParameterNames)}]"; - var genericsEx = $"[{string.Join(", ", m.Generics.Select(x=> $"typeof({x})"))}]"; + var genericsEx = $"[{string.Join(", ", m.Generics.Select(x => $"typeof({x})"))}]"; var invoke = $"await mesh.RequestAsync(subject, {parameterEx}, {genericsEx});"; if (m.Return.StartsWith("ValueTask<")) - invoke = $"return await mesh.RequestAsync<{m.ReturnArguments[0]}>(subject, {parameterEx}, {genericsEx});"; + invoke = + $"return await mesh.RequestAsync<{m.ReturnArguments[0]}>(subject, {parameterEx}, {genericsEx});"; if (m.Return.StartsWith("IAsyncEnumerable<")) invoke = $""" @@ -156,7 +153,8 @@ private static void BuildTraceProxy(SourceProductionContext context, ServiceDesc builder.AppendLine(); } - builder.AppendLine($"public sealed class {service.ClassName}TraceProxy({service.ClassName} svc) : I{service.ClassName}"); + builder.AppendLine( + $"public sealed class {service.ClassName}TraceProxy({service.ClassName} svc) : I{service.ClassName}"); builder.AppendLine("{"); foreach (var m in service.Methods) diff --git a/Core.ServiceMesh/Internal/ServiceMeshWorker.cs b/Core.ServiceMesh/Internal/ServiceMeshWorker.cs index b1ccf84..2d8b3b0 100644 --- a/Core.ServiceMesh/Internal/ServiceMeshWorker.cs +++ b/Core.ServiceMesh/Internal/ServiceMeshWorker.cs @@ -2,7 +2,6 @@ using System.Reflection; using System.Threading.Channels; using Core.ServiceMesh.Abstractions; -using Microsoft.AspNetCore.Mvc.Formatters.Xml; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -12,7 +11,6 @@ using OpenTelemetry; using OpenTelemetry.Context.Propagation; using OpenTelemetry.Trace; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace Core.ServiceMesh.Internal; @@ -34,7 +32,7 @@ public T CreateProxy() where T : class if (remoteProxy == null) throw new InvalidOperationException($"proxy was found for interface {typeof(T).FullName}"); - return (T)Activator.CreateInstance(remoteProxy, args: [this])!; + return (T)Activator.CreateInstance(remoteProxy, this)!; } public async ValueTask PublishAsync(object message, @@ -51,7 +49,8 @@ public async ValueTask PublishAsync(object message, new PropagationContext(Activity.Current.Context, Baggage.Current), headers, (h, key, value) => { h[key] = value; }); - using var activity = ServiceMeshActivity.Source.StartActivity($"PUB {message.GetType().Name}", ActivityKind.Producer, Activity.Current?.Context ?? default); + using var activity = ServiceMeshActivity.Source.StartActivity($"PUB {message.GetType().Name}", + ActivityKind.Producer, Activity.Current?.Context ?? default); //if (activity != null) // activity.DisplayName = subject; @@ -78,7 +77,8 @@ public async ValueTask SendAsync(object message) new PropagationContext(Activity.Current.Context, Baggage.Current), headers, (h, key, value) => { h[key] = value; }); - using var activity = ServiceMeshActivity.Source.StartActivity($"PUB {message.GetType().Name}", ActivityKind.Producer, Activity.Current?.Context ?? default); + using var activity = ServiceMeshActivity.Source.StartActivity($"PUB {message.GetType().Name}", + ActivityKind.Producer, Activity.Current?.Context ?? default); // if (activity != null) // activity.DisplayName = subject; @@ -86,6 +86,121 @@ public async ValueTask SendAsync(object message) await nats.PublishAsync(subject, data, headers); } + public async ValueTask RequestAsync(string subject, object[] args, Type[] generics) + { + subject = ApplyPrefix(subject); + + var call = new ServiceInvocation + { + //Service = attr.Name, + //Method = info.Name, + Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), + Arguments = args.Select(x => options.Serialize(x, false)).ToList(), + Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() + }; + + var body = options.Serialize(call, true); + + var headers = new NatsHeaders(); + + var activityContext = Activity.Current?.Context ?? default; + Propagators.DefaultTextMapPropagator.Inject( + new PropagationContext(activityContext, Baggage.Current), headers, + (h, key, value) => { h[key] = value; }); + + using var activity = + ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); + + //if (activity != null) + // activity.DisplayName = $"{call.Service}.{call.Method}"; + + var res = await nats.RequestAsync(subject, + body, replyOpts: new NatsSubOpts + { + Timeout = TimeSpan.FromSeconds(30) + }, headers: headers); + + res.EnsureSuccess(); + + return (T)options.Deserialize(res.Data!, typeof(T), true)!; + } + + public async ValueTask RequestAsync(string subject, object[] args, Type[] generics) + { + subject = ApplyPrefix(subject); + + var call = new ServiceInvocation + { + //Service = attr.Name, + //Method = info.Name, + Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), + Arguments = args.Select(x => options.Serialize(x, false)).ToList(), + Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() + }; + + var body = options.Serialize(call, true); + + var headers = new NatsHeaders(); + + var activityContext = Activity.Current?.Context ?? default; + + Propagators.DefaultTextMapPropagator.Inject( + new PropagationContext(activityContext, Baggage.Current), headers, + (h, key, value) => { h[key] = value; }); + + using var activity = + ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); + + //if (activity != null) + // activity.DisplayName = $""; + + var res = await nats.RequestAsync(subject, + body, replyOpts: new NatsSubOpts + { + Timeout = TimeSpan.FromSeconds(30) + }, headers: headers); + + res.EnsureSuccess(); + } + + public async IAsyncEnumerable StreamAsync(string subject, object[] args, Type[] generics) + { + subject = ApplyPrefix(subject); + + var call = new ServiceInvocation + { + //Service = attr.Name, + //Method = info.Name, + Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), + Arguments = args.Select(x => options.Serialize(x, false)).ToList(), + Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() + }; + + var body = options.Serialize(call, true); + + var headers = new NatsHeaders(); + + var activityContext = Activity.Current?.Context ?? default; + Propagators.DefaultTextMapPropagator.Inject( + new PropagationContext(activityContext, Baggage.Current), headers, + (h, key, value) => { h[key] = value; }); + + using var activity = + ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); + + var subId = Guid.NewGuid().ToString("N"); + + headers["return-sub-id"] = subId; + await nats.PublishAsync(subject, body, headers); + + await foreach (var msg in nats.SubscribeAsync(subId)) + { + if (msg.Data == null) + yield break; + yield return (T)options.Deserialize(msg.Data!, typeof(T), true)!; + } + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { //RemoteDispatchProxy.Worker = this; @@ -124,7 +239,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) || newcfg.Retention != oldcfg.Retention || newcfg.DenyDelete != oldcfg.DenyDelete || newcfg.DenyPurge != oldcfg.DenyPurge - ) + ) { logger.LogError("stream {0} cannot be updated", stream.Key); continue; @@ -145,10 +260,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) || newcfg.AllowRollupHdrs != oldcfg.AllowRollupHdrs || newcfg.Republish != oldcfg.Republish || newcfg.Compression != oldcfg.Compression - ) - { + ) await _jetStream.UpdateStreamAsync(newcfg, stoppingToken); - } } catch (Exception) { @@ -225,7 +338,8 @@ await _jetStream.DeleteConsumerAsync(durableConsumer.Stream, durableConsumer.Nam IdleHeartbeat = TimeSpan.FromSeconds(5) }; - var modifiedConsumeOpts = options.ConfigureConsumer(durableConsumer.Name, newConsumerConfig, consumeOpts); + var modifiedConsumeOpts = + options.ConfigureConsumer(durableConsumer.Name, newConsumerConfig, consumeOpts); var con = await _jetStream .CreateOrUpdateConsumerAsync(durableConsumer.Stream, newConsumerConfig, stoppingToken) @@ -235,8 +349,8 @@ await _jetStream.DeleteConsumerAsync(durableConsumer.Stream, durableConsumer.Nam } foreach (var transientConsumer in ServiceMeshExtensions.Consumers.Where(x => !x.IsDurable)) - foreach (var m in transientConsumer.Methods) - tasks.Add(TransientListener(transientConsumer, m.Key, stoppingToken)); + foreach (var m in transientConsumer.Methods) + tasks.Add(TransientListener(transientConsumer, m.Key, stoppingToken)); for (var i = 0; i < options.StreamWorkers; i++) tasks.Add(DurableWorker(stoppingToken)); @@ -274,7 +388,8 @@ private async Task TransientListener( string subject, CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(subject, reg.QueueGroup, cancellationToken: stoppingToken)) + await foreach + (var msg in nats.SubscribeAsync(subject, reg.QueueGroup, cancellationToken: stoppingToken)) await _broadcastChannel!.Writer.WriteAsync((msg, reg), stoppingToken); } @@ -376,13 +491,13 @@ private async Task ServiceWorker(CancellationToken stoppingToken) if (method.ReturnType.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>)) { var subId = msg.Headers["return-sub-id"].FirstOrDefault(); - + try { var resType = method.ReturnType.GenericTypeArguments[0]; var wrapper = typeof(ServiceMeshWorker).GetMethod(nameof(AsyncEnumerableWrapper), - BindingFlags.NonPublic | BindingFlags.Instance) + BindingFlags.NonPublic | BindingFlags.Instance) .MakeGenericMethod(resType); dynamic stream = method.Invoke(instance, args.ToArray()); @@ -397,7 +512,7 @@ private async Task ServiceWorker(CancellationToken stoppingToken) ["exception"] = ex.Message }; - await nats.PublishAsync(subId,[], headers); + await nats.PublishAsync(subId, [], headers); activity?.SetStatus(ActivityStatusCode.Error); activity?.RecordException(ex); } @@ -407,7 +522,8 @@ private async Task ServiceWorker(CancellationToken stoppingToken) try { if (method.IsGenericMethod) - method = method.MakeGenericMethod(invocation.Generics.Select(options.ResolveType).ToArray()!); + method = method.MakeGenericMethod( + invocation.Generics.Select(options.ResolveType).ToArray()!); dynamic awaitable = method.Invoke(instance, args.ToArray()); await awaitable; @@ -511,116 +627,4 @@ private string ApplyPrefix(string a) return $"{options.Prefix}-{a}"; return a; } - - public async ValueTask RequestAsync(string subject, object[] args, Type[] generics) - { - subject = ApplyPrefix(subject); - - var call = new ServiceInvocation - { - //Service = attr.Name, - //Method = info.Name, - Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), - Arguments = args.Select(x => options.Serialize(x, false)).ToList(), - Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() - }; - - var body = options.Serialize(call, true); - - var headers = new NatsHeaders(); - - var activityContext = Activity.Current?.Context ?? default; - Propagators.DefaultTextMapPropagator.Inject( - new PropagationContext(activityContext, Baggage.Current), headers, - (h, key, value) => { h[key] = value; }); - - using var activity = ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); - - //if (activity != null) - // activity.DisplayName = $"{call.Service}.{call.Method}"; - - var res = await nats.RequestAsync(subject, - body, replyOpts: new NatsSubOpts - { - Timeout = TimeSpan.FromSeconds(30) - }, headers: headers); - - res.EnsureSuccess(); - - return (T)options.Deserialize(res.Data!, typeof(T), true)!; - } - - public async ValueTask RequestAsync(string subject, object[] args, Type[] generics) - { - subject = ApplyPrefix(subject); - - var call = new ServiceInvocation - { - //Service = attr.Name, - //Method = info.Name, - Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), - Arguments = args.Select(x => options.Serialize(x, false)).ToList(), - Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() - }; - - var body = options.Serialize(call, true); - - var headers = new NatsHeaders(); - - var activityContext = Activity.Current?.Context ?? default; - - Propagators.DefaultTextMapPropagator.Inject( - new PropagationContext(activityContext, Baggage.Current), headers, - (h, key, value) => { h[key] = value; }); - - using var activity = ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); - - //if (activity != null) - // activity.DisplayName = $""; - - var res = await nats.RequestAsync(subject, - body, replyOpts: new NatsSubOpts - { - Timeout = TimeSpan.FromSeconds(30) - }, headers: headers); - - res.EnsureSuccess(); - } - - public async IAsyncEnumerable StreamAsync(string subject, object[] args, Type[] generics) - { - subject = ApplyPrefix(subject); - - var call = new ServiceInvocation - { - //Service = attr.Name, - //Method = info.Name, - Signature = args.Select(x => x.GetType().AssemblyQualifiedName!).ToList(), - Arguments = args.Select(x => options.Serialize(x, false)).ToList(), - Generics = generics.Select(x => x.AssemblyQualifiedName!).ToList() - }; - - var body = options.Serialize(call, true); - - var headers = new NatsHeaders(); - - var activityContext = Activity.Current?.Context ?? default; - Propagators.DefaultTextMapPropagator.Inject( - new PropagationContext(activityContext, Baggage.Current), headers, - (h, key, value) => { h[key] = value; }); - - using var activity = ServiceMeshActivity.Source.StartActivity($"REQ {subject}", ActivityKind.Client, activityContext); - - var subId = Guid.NewGuid().ToString("N"); - - headers["return-sub-id"] = subId; - await nats.PublishAsync(subject, body, headers: headers); - - await foreach (var msg in nats.SubscribeAsync(subId)) - { - if (msg.Data == null) - yield break; - yield return (T)options.Deserialize(msg.Data!, typeof(T), true)!; - } - } } \ No newline at end of file diff --git a/Core.ServiceMesh/ServiceMeshExtensions.cs b/Core.ServiceMesh/ServiceMeshExtensions.cs index 555a4e3..c9a8936 100644 --- a/Core.ServiceMesh/ServiceMeshExtensions.cs +++ b/Core.ServiceMesh/ServiceMeshExtensions.cs @@ -3,7 +3,6 @@ using Core.ServiceMesh.Abstractions; using Core.ServiceMesh.Internal; using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NATS.Client.Hosting; @@ -51,7 +50,6 @@ public static IHostApplicationBuilder AddServiceMesh(this IHostApplicationBuilde return y.GetCustomAttribute() != null || y.GetInterfaces().Any(z => z.GetCustomAttribute() != null); }))) - { if (type.IsInterface) { interfaces.Add(type); @@ -91,7 +89,6 @@ public static IHostApplicationBuilder AddServiceMesh(this IHostApplicationBuilde // todo: configurable lifetime builder.Services.Add(new ServiceDescriptor(type, type, ServiceLifetime.Scoped)); } - } if (options.InterfaceMode != ServiceInterfaceMode.None) foreach (var serviceInterface in interfaces) @@ -116,14 +113,17 @@ public static IHostApplicationBuilder AddServiceMesh(this IHostApplicationBuilde } else if (options.InterfaceMode == ServiceInterfaceMode.AutoTrace) { - var traceProxy = serviceInterface.Assembly.GetType(impl.ImplementationType.FullName + "TraceProxy"); + var traceProxy = + serviceInterface.Assembly.GetType(impl.ImplementationType.FullName + "TraceProxy"); if (traceProxy != null) builder.Services.AddSingleton(serviceInterface, traceProxy); } else + { builder.Services.Add(new ServiceDescriptor(serviceInterface, impl.ImplementationType, ServiceLifetime.Scoped)); + } } } diff --git a/SampleApp/Controllers/DevController.cs b/SampleApp/Controllers/DevController.cs index bd0120c..9c4ca20 100644 --- a/SampleApp/Controllers/DevController.cs +++ b/SampleApp/Controllers/DevController.cs @@ -18,7 +18,7 @@ public async Task Publish([FromQuery] string message) [HttpPost("publish-other")] public async Task PublishOther([FromQuery] string message, [FromQuery] int count = 1) { - for (int i = 0; i < count; i++) + for (var i = 0; i < count; i++) await mesh.PublishAsync(new SomeOtherCommand(message)); return Ok(); } diff --git a/SampleApp/Program.cs b/SampleApp/Program.cs index e51effe..2222e5d 100644 --- a/SampleApp/Program.cs +++ b/SampleApp/Program.cs @@ -5,15 +5,9 @@ var builder = WebApplication.CreateBuilder(args); -builder.AddObservability(configureTracing: trace => -{ - trace.AddServiceMeshInstrumentation(); -}); +builder.AddObservability(configureTracing: trace => { trace.AddServiceMeshInstrumentation(); }); -builder.Services.Configure(options => -{ - -}); +builder.Services.Configure(options => { }); builder.AddServiceMesh(options => { @@ -22,10 +16,7 @@ { Url = "nats://localhost:4222" }; - options.ConfigureStream = (name, config) => - { - config.MaxAge = TimeSpan.FromDays(1); - }; + options.ConfigureStream = (name, config) => { config.MaxAge = TimeSpan.FromDays(1); }; options.InterfaceMode = ServiceInterfaceMode.ForceRemote; options.Assemblies = [typeof(ISomeService).Assembly, typeof(SomeService).Assembly]; }); diff --git a/SampleApp/SampleApp.csproj b/SampleApp/SampleApp.csproj index 27ff177..722a3b5 100644 --- a/SampleApp/SampleApp.csproj +++ b/SampleApp/SampleApp.csproj @@ -9,15 +9,13 @@ - + - + diff --git a/SampleAppNuget/Controllers/WeatherForecastController.cs b/SampleAppNuget/Controllers/WeatherForecastController.cs index 5cfb113..1f27b0d 100644 --- a/SampleAppNuget/Controllers/WeatherForecastController.cs +++ b/SampleAppNuget/Controllers/WeatherForecastController.cs @@ -1,33 +1,32 @@ using Microsoft.AspNetCore.Mvc; -namespace SampleAppNuget.Controllers +namespace SampleAppNuget.Controllers; + +[ApiController] +[Route("[controller]")] +public class WeatherForecastController : ControllerBase { - [ApiController] - [Route("[controller]")] - public class WeatherForecastController : ControllerBase + private static readonly string[] Summaries = { - private static readonly string[] Summaries = new[] - { - "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" - }; + "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" + }; - private readonly ILogger _logger; + private readonly ILogger _logger; - public WeatherForecastController(ILogger logger) - { - _logger = logger; - } + public WeatherForecastController(ILogger logger) + { + _logger = logger; + } - [HttpGet(Name = "GetWeatherForecast")] - public IEnumerable Get() - { - return Enumerable.Range(1, 5).Select(index => new WeatherForecast + [HttpGet(Name = "GetWeatherForecast")] + public IEnumerable Get() + { + return Enumerable.Range(1, 5).Select(index => new WeatherForecast { Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), TemperatureC = Random.Shared.Next(-20, 55), Summary = Summaries[Random.Shared.Next(Summaries.Length)] }) .ToArray(); - } } -} +} \ No newline at end of file diff --git a/SampleAppNuget/Mesh/IAnotherService.cs b/SampleAppNuget/Mesh/IAnotherService.cs index 7dad076..3c27fb1 100644 --- a/SampleAppNuget/Mesh/IAnotherService.cs +++ b/SampleAppNuget/Mesh/IAnotherService.cs @@ -11,4 +11,4 @@ public interface INugetService ValueTask SampleB(SampleRequest request); ValueTask SampleC(SampleRequest request); -}; \ No newline at end of file +} \ No newline at end of file diff --git a/SampleAppNuget/Program.cs b/SampleAppNuget/Program.cs index 48863a6..8264bac 100644 --- a/SampleAppNuget/Program.cs +++ b/SampleAppNuget/Program.cs @@ -22,4 +22,4 @@ app.MapControllers(); -app.Run(); +app.Run(); \ No newline at end of file diff --git a/SampleAppNuget/WeatherForecast.cs b/SampleAppNuget/WeatherForecast.cs index b0c3e7c..2ef4e30 100644 --- a/SampleAppNuget/WeatherForecast.cs +++ b/SampleAppNuget/WeatherForecast.cs @@ -1,13 +1,12 @@ -namespace SampleAppNuget +namespace SampleAppNuget; + +public class WeatherForecast { - public class WeatherForecast - { - public DateOnly Date { get; set; } + public DateOnly Date { get; set; } - public int TemperatureC { get; set; } + public int TemperatureC { get; set; } - public int TemperatureF => 32 + (int)(TemperatureC / 0.5556); + public int TemperatureF => 32 + (int)(TemperatureC / 0.5556); - public string? Summary { get; set; } - } -} + public string? Summary { get; set; } +} \ No newline at end of file diff --git a/SampleInterfaces/IAnotherService.cs b/SampleInterfaces/IAnotherService.cs index b518f92..e86906a 100644 --- a/SampleInterfaces/IAnotherService.cs +++ b/SampleInterfaces/IAnotherService.cs @@ -10,4 +10,4 @@ public interface IAnotherService ValueTask SampleB(SampleRequest request); ValueTask SampleC(SampleRequest request); -}; \ No newline at end of file +} \ No newline at end of file diff --git a/SampleInterfaces/ISomeService.cs b/SampleInterfaces/ISomeService.cs index 61ab5b3..c3be8b6 100644 --- a/SampleInterfaces/ISomeService.cs +++ b/SampleInterfaces/ISomeService.cs @@ -11,4 +11,4 @@ public interface ISomeService ValueTask GenericAdd(T a, T b) where T : INumber; ValueTask Sample(SampleRequest request); IAsyncEnumerable StreamingResponse(SampleRequest request); -}; \ No newline at end of file +} \ No newline at end of file diff --git a/SampleWorker/Consumers/MultiCommandHandler.cs b/SampleWorker/Consumers/MultiCommandHandler.cs index 1f94b2b..f8ce650 100644 --- a/SampleWorker/Consumers/MultiCommandHandler.cs +++ b/SampleWorker/Consumers/MultiCommandHandler.cs @@ -3,18 +3,18 @@ namespace SampleWorker.Consumers; -[DurableConsumer("MultiCommandHandler", Stream = "default", +[DurableConsumer("MultiCommandHandler", Stream = "default", DeliverPolicy = DeliverPolicy.New, MaxDeliver = 3, Backoff = [30, 600])] -public class MultiCommandHandler(ILogger logger) +public class MultiCommandHandler(ILogger logger) : IConsumer, IConsumer { - public async ValueTask ConsumeAsync(SomeOtherCommand message, CancellationToken token) + public async ValueTask ConsumeAsync(SomeCommand message, CancellationToken token) { logger.LogInformation($"MultiCommandHandler: {message.Name}"); await Task.Delay(TimeSpan.FromSeconds(1)); } - public async ValueTask ConsumeAsync(SomeCommand message, CancellationToken token) + public async ValueTask ConsumeAsync(SomeOtherCommand message, CancellationToken token) { logger.LogInformation($"MultiCommandHandler: {message.Name}"); await Task.Delay(TimeSpan.FromSeconds(1)); diff --git a/SampleWorker/Consumers/SomeCommandHandler4.cs b/SampleWorker/Consumers/SomeCommandHandler4.cs index 077a0bd..a7140e0 100644 --- a/SampleWorker/Consumers/SomeCommandHandler4.cs +++ b/SampleWorker/Consumers/SomeCommandHandler4.cs @@ -3,11 +3,11 @@ namespace SampleWorker.Consumers; -[DurableConsumer("SomeOtherCommandHandler", - Stream = "default", - DeliverPolicy = DeliverPolicy.All, +[DurableConsumer("SomeOtherCommandHandler", + Stream = "default", + DeliverPolicy = DeliverPolicy.All, MaxAckPending = 1, - AckWait = 60*5)] + AckWait = 60 * 5)] public class SomeOtherCommandHandler(ILogger logger) : IConsumer { public async ValueTask ConsumeAsync(SomeOtherCommand message, CancellationToken token) diff --git a/SampleWorker/Program.cs b/SampleWorker/Program.cs index 80873bd..6f3b7b0 100644 --- a/SampleWorker/Program.cs +++ b/SampleWorker/Program.cs @@ -4,15 +4,9 @@ var builder = Host.CreateApplicationBuilder(args); -builder.AddObservability(configureTracing: trace => -{ - trace.AddServiceMeshInstrumentation(); -}); +builder.AddObservability(configureTracing: trace => { trace.AddServiceMeshInstrumentation(); }); -builder.Services.Configure(options => -{ - -}); +builder.Services.Configure(options => { }); builder.AddServiceMesh(options => { @@ -21,13 +15,10 @@ { Url = "nats://localhost:4222" }; - options.ConfigureStream = (name, config) => - { - config.MaxAge = TimeSpan.FromDays(1); - }; + options.ConfigureStream = (name, config) => { config.MaxAge = TimeSpan.FromDays(1); }; options.InterfaceMode = ServiceInterfaceMode.None; options.Assemblies = [typeof(SomeCommandHandler).Assembly]; }); var host = builder.Build(); -host.Run(); +host.Run(); \ No newline at end of file