From 35ecde7c03c500ab91fecbda09d6caaac8f2f4b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A4r=20Dahlman?= Date: Sun, 19 Nov 2017 14:30:13 +0100 Subject: [PATCH] (#263) Add RetryLater enricher --- RawRabbit.sln | 9 +- .../SubscribeMessageContextExtension.cs | 2 +- .../Common/Retry.cs | 19 +++ .../Common/RetryHeaders.cs | 8 ++ .../Common/RetryInformation.cs | 10 ++ .../Common/RetryInformationHeaderUpdater.cs | 68 ++++++++++ .../Common/RetryInformationProvider.cs | 54 ++++++++ .../Common/RetryLaterPipeContextExtensions.cs | 20 +++ .../RetryInformationExtractionMiddleware.cs | 51 ++++++++ .../Middleware/RetryLaterMiddleware.cs | 119 ++++++++++++++++++ .../RawRabbit.Enrichers.RetryLater.csproj | 33 +++++ .../RetryLaterPlugin.cs | 22 ++++ .../Acknowledgement/Respond.cs | 8 -- .../Acknowledgement/Retry.cs | 20 --- src/RawRabbit/Common/Acknowledgement.cs | 19 +-- src/RawRabbit/Common/PropertyHeaders.cs | 1 - src/RawRabbit/Logging/LibLog.cs | 12 +- .../Pipe/Middleware/ExplicitAckMiddleware.cs | 36 ------ .../Enrichers/AttributeEnricherTests.cs | 4 - .../Enrichers/RetryLaterEnricherTests.cs | 55 ++++++++ .../AcknowledgementSubscribeTests.cs | 9 +- .../RawRabbit.IntegrationTests.csproj | 1 + .../RawRabbitFactory.cs | 8 +- 23 files changed, 493 insertions(+), 95 deletions(-) create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/Retry.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/RetryHeaders.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationHeaderUpdater.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationProvider.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Common/RetryLaterPipeContextExtensions.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Middleware/RetryInformationExtractionMiddleware.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/Middleware/RetryLaterMiddleware.cs create mode 100644 src/RawRabbit.Enrichers.RetryLater/RawRabbit.Enrichers.RetryLater.csproj create mode 100644 src/RawRabbit.Enrichers.RetryLater/RetryLaterPlugin.cs delete mode 100644 src/RawRabbit.Operations.Respond/Acknowledgement/Retry.cs create mode 100644 test/RawRabbit.IntegrationTests/Enrichers/RetryLaterEnricherTests.cs diff --git a/RawRabbit.sln b/RawRabbit.sln index 182d004c..c4913e5a 100644 --- a/RawRabbit.sln +++ b/RawRabbit.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.16 +VisualStudioVersion = 15.0.27004.2006 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}" EndProject @@ -72,6 +72,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawRabbit.Enrichers.Protobu EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RawRabbit.Enrichers.Polly.Tests", "test\RawRabbit.Enrichers.Polly.Tests\RawRabbit.Enrichers.Polly.Tests.csproj", "{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RawRabbit.Enrichers.RetryLater", "src\RawRabbit.Enrichers.RetryLater\RawRabbit.Enrichers.RetryLater.csproj", "{E1816B3D-9C4B-4D08-9537-ACB6806AF690}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -194,6 +196,10 @@ Global {4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Debug|Any CPU.Build.0 = Debug|Any CPU {4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Release|Any CPU.ActiveCfg = Release|Any CPU {4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Release|Any CPU.Build.0 = Release|Any CPU + {E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -228,6 +234,7 @@ Global {CF308330-735E-411D-BBA9-0018DD079AF1} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB} {8D45F8AC-B65F-4A2B-9153-8A7F3D423575} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB} {4B4C5936-D61E-4FD8-AEB7-154CEAF84E15} = {2F91E22A-AEBA-4BEF-9A03-C8232830F697} + {E1816B3D-9C4B-4D08-9537-ACB6806AF690} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6EC93B92-1319-44D3-A596-9FBD9BD23050} diff --git a/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs b/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs index f63ea3aa..627d8fe1 100644 --- a/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs +++ b/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs @@ -13,7 +13,7 @@ namespace RawRabbit public static class SubscribeMessageContextExtension { public static readonly Action ConsumePipe = consume => consume - .Use(StageMarkerOptions.For(MessageContextSubscibeStage.MessageRecieved)) + .Use(StageMarkerOptions.For(StageMarker.MessageRecieved)) .Use(new HeaderDeserializationOptions { HeaderKeyFunc = c => PropertyHeaders.Context, diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/Retry.cs b/src/RawRabbit.Enrichers.RetryLater/Common/Retry.cs new file mode 100644 index 00000000..9bbed0ca --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/Retry.cs @@ -0,0 +1,19 @@ +using System; + +namespace RawRabbit.Common +{ + public class Retry : Acknowledgement + { + public TimeSpan Span { get; set; } + + public Retry(TimeSpan span) + { + Span = span; + } + + public static Retry In(TimeSpan span) + { + return new Retry(span); + } + } +} \ No newline at end of file diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/RetryHeaders.cs b/src/RawRabbit.Enrichers.RetryLater/Common/RetryHeaders.cs new file mode 100644 index 00000000..f2e1ee70 --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/RetryHeaders.cs @@ -0,0 +1,8 @@ +namespace RawRabbit.Common +{ + public class RetryHeaders + { + public const string NumberOfRetries = "x-number-of-retries"; + public const string OriginalDelivered = "x-original-delivered"; + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs new file mode 100644 index 00000000..aafc992e --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs @@ -0,0 +1,10 @@ +using System; + +namespace RawRabbit.Common +{ + public class RetryInformation + { + public int NumberOfRetries { get; set; } + public DateTime OriginalDelivered { get; set; } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationHeaderUpdater.cs b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationHeaderUpdater.cs new file mode 100644 index 00000000..f9265f39 --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationHeaderUpdater.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using RabbitMQ.Client.Events; + +namespace RawRabbit.Common +{ + public interface IRetryInformationHeaderUpdater + { + void AddOrUpdate(BasicDeliverEventArgs args); + void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo); + } + + public class RetryInformationHeaderUpdater : IRetryInformationHeaderUpdater + { + public void AddOrUpdate(BasicDeliverEventArgs args) + { + TryAddOriginalDelivered(args, DateTime.UtcNow); + AddOrUpdateNumberOfRetries(args); + } + + public void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo) + { + TryAddOriginalDelivered(args, retryInfo.OriginalDelivered); + AddOrUpdateNumberOfRetries(args); + } + + private void AddOrUpdateNumberOfRetries(BasicDeliverEventArgs args) + { + var currentRetry = 0; + if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.NumberOfRetries)) + { + var valueStr = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries); + currentRetry = int.Parse(valueStr); + args.BasicProperties.Headers.Remove(RetryHeaders.NumberOfRetries); + } + var nextRetry = (++currentRetry).ToString(); + args.BasicProperties.Headers.Add(RetryHeaders.NumberOfRetries, nextRetry); + } + + private static void TryAddOriginalDelivered(BasicDeliverEventArgs args, DateTime originalDelivered) + { + if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.OriginalDelivered)) + { + return; + } + args.BasicProperties.Headers.Add(RetryHeaders.OriginalDelivered, originalDelivered.ToString("u")); + } + + private static string GetHeaderString(IDictionary headers, string key) + { + if (headers == null) + { + return null; + } + if (!headers.ContainsKey(key)) + { + return null; + } + if (!(headers[key] is byte[] headerBytes)) + { + return null; + } + + var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes); + return headerStr; + } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationProvider.cs b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationProvider.cs new file mode 100644 index 00000000..f930e32c --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationProvider.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using RabbitMQ.Client.Events; + +namespace RawRabbit.Common +{ + public interface IRetryInformationProvider + { + RetryInformation Get(BasicDeliverEventArgs args); + } + + public class RetryInformationProvider : IRetryInformationProvider + { + public RetryInformation Get(BasicDeliverEventArgs args) + { + return new RetryInformation + { + NumberOfRetries = ExtractNumberOfRetries(args), + OriginalDelivered = ExtractOriginalDelivered(args) + }; + } + + private DateTime ExtractOriginalDelivered(BasicDeliverEventArgs args) + { + var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.OriginalDelivered); + return DateTime.TryParse(headerValue, out var originalSent) ? originalSent : DateTime.UtcNow; + } + + private int ExtractNumberOfRetries(BasicDeliverEventArgs args) + { + var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries); + return int.TryParse(headerValue, out var noOfRetries) ? noOfRetries : 0; + } + + private static string GetHeaderString(IDictionary headers, string key) + { + if (headers == null) + { + return null; + } + if (!headers.ContainsKey(key)) + { + return null; + } + if (!(headers[key] is byte[] headerBytes)) + { + return null; + } + + var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes); + return headerStr; + } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Common/RetryLaterPipeContextExtensions.cs b/src/RawRabbit.Enrichers.RetryLater/Common/RetryLaterPipeContextExtensions.cs new file mode 100644 index 00000000..c00334e7 --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Common/RetryLaterPipeContextExtensions.cs @@ -0,0 +1,20 @@ +using RawRabbit.Pipe; + +namespace RawRabbit.Common +{ + public static class RetryLaterPipeContextExtensions + { + private const string RetryInformationKey = "RetryInformation"; + + internal static IPipeContext AddRetryInformation(this IPipeContext context, RetryInformation retryInformation) + { + context.Properties.TryAdd(RetryInformationKey, retryInformation); + return context; + } + + public static RetryInformation GetRetryInformation(this IPipeContext context) + { + return context.Get(RetryInformationKey); + } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryInformationExtractionMiddleware.cs b/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryInformationExtractionMiddleware.cs new file mode 100644 index 00000000..22b3e703 --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryInformationExtractionMiddleware.cs @@ -0,0 +1,51 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; +using RawRabbit.Common; +using RawRabbit.Pipe; +using RawRabbit.Pipe.Middleware; + +namespace RawRabbit.Middleware +{ + public class RetryInformationExtractionOptions + { + public Func DeliveryArgsFunc { get; set; } + } + + public class RetryInformationExtractionMiddleware : StagedMiddleware + { + private readonly IRetryInformationProvider _retryProvider; + protected Func DeliveryArgsFunc; + public override string StageMarker => Pipe.StageMarker.MessageRecieved; + + public RetryInformationExtractionMiddleware(IRetryInformationProvider retryProvider, RetryInformationExtractionOptions options = null) + { + _retryProvider = retryProvider; + DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs()); + } + + public override Task InvokeAsync(IPipeContext context, CancellationToken token = default(CancellationToken)) + { + var retryInfo = GetRetryInformation(context); + AddToPipeContext(context, retryInfo); + return Next.InvokeAsync(context, token); + } + + protected virtual BasicDeliverEventArgs GetDeliveryEventArgs(IPipeContext context) + { + return DeliveryArgsFunc?.Invoke(context); + } + + protected virtual RetryInformation GetRetryInformation(IPipeContext context) + { + var devlieryArgs = GetDeliveryEventArgs(context); + return _retryProvider.Get(devlieryArgs); + } + + protected virtual void AddToPipeContext(IPipeContext context, RetryInformation retryInfo) + { + context.AddRetryInformation(retryInfo); + } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryLaterMiddleware.cs b/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryLaterMiddleware.cs new file mode 100644 index 00000000..fe6e9f74 --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/Middleware/RetryLaterMiddleware.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; +using RawRabbit.Channel.Abstraction; +using RawRabbit.Common; +using RawRabbit.Configuration.Exchange; +using RawRabbit.Configuration.Queue; +using RawRabbit.Logging; +using RawRabbit.Pipe; +using RawRabbit.Pipe.Middleware; +using ExchangeType = RabbitMQ.Client.ExchangeType; + +namespace RawRabbit.Middleware +{ + public class RetryLaterOptions + { + public Func AcknowledgementFunc { get; set; } + public Func DeliveryArgsFunc { get; set; } + } + + public class RetryLaterMiddleware : StagedMiddleware + { + private readonly ILog _logger = LogProvider.For(); + protected readonly ITopologyProvider TopologyProvider; + protected readonly INamingConventions Conventions; + protected readonly IChannelFactory ChannelFactory; + private readonly IRetryInformationHeaderUpdater _headerUpdater; + protected Func AcknowledgementFunc; + protected Func DeliveryArgsFunc; + + public override string StageMarker => Pipe.StageMarker.HandlerInvoked; + + public RetryLaterMiddleware(ITopologyProvider topology, INamingConventions conventions, IChannelFactory channelFactory, IRetryInformationHeaderUpdater headerUpdater, RetryLaterOptions options = null) + { + TopologyProvider = topology; + Conventions = conventions; + ChannelFactory = channelFactory; + _headerUpdater = headerUpdater; + AcknowledgementFunc = options?.AcknowledgementFunc ?? (context => context.GetMessageAcknowledgement()); + DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs()); + } + + public override async Task InvokeAsync(IPipeContext context, CancellationToken token = default(CancellationToken)) + { + var ack = GetMessageAcknowledgement(context); + if (!(ack is Retry retryAck)) + { + await Next.InvokeAsync(context, token); + return; + } + + var deadLeterExchangeName = GetDeadLetterExchangeName(retryAck.Span); + await TopologyProvider.DeclareExchangeAsync(new ExchangeDeclaration + { + Name = deadLeterExchangeName, + Durable = true, + ExchangeType = ExchangeType.Direct + }); + + var deliveryArgs = GetDeliveryEventArgs(context); + _logger.Info("Message is marked for Retry. Will be published on exchange {exchangeName} with routing key {routingKey} in {retryIn}", deliveryArgs.Exchange, deliveryArgs.RoutingKey, retryAck.Span); + UpdateRetryHeaders(deliveryArgs, context); + var deadLetterQueueName = GetDeadLetterQueueName(deliveryArgs.Exchange, retryAck.Span); + await TopologyProvider.DeclareQueueAsync(new QueueDeclaration + { + Name = deadLetterQueueName, + Durable = true, + Arguments = new Dictionary + { + {QueueArgument.DeadLetterExchange, deliveryArgs.Exchange}, + {QueueArgument.Expires, Convert.ToInt32(retryAck.Span.Add(TimeSpan.FromSeconds(1)).TotalMilliseconds)}, + {QueueArgument.MessageTtl, Convert.ToInt32(retryAck.Span.TotalMilliseconds)} + } + }); + await TopologyProvider.BindQueueAsync(deadLetterQueueName, deadLeterExchangeName, deliveryArgs.RoutingKey); + using (var publishChannel = await ChannelFactory.CreateChannelAsync(token)) + { + publishChannel.BasicPublish(deadLeterExchangeName, deliveryArgs.RoutingKey, false, deliveryArgs.BasicProperties, deliveryArgs.Body); + } + await TopologyProvider.UnbindQueueAsync(deadLetterQueueName, deadLeterExchangeName, deliveryArgs.RoutingKey); + + context.Properties.AddOrReplace(PipeKey.MessageAcknowledgement, new Ack()); + await Next.InvokeAsync(context, token); + } + + private string GetDeadLetterQueueName(string originalExchangeName, TimeSpan retryAckSpan) + { + return Conventions.RetryLaterQueueNameConvetion(originalExchangeName, retryAckSpan); + } + + protected virtual Acknowledgement GetMessageAcknowledgement(IPipeContext context) + { + return AcknowledgementFunc?.Invoke(context); + } + + protected virtual BasicDeliverEventArgs GetDeliveryEventArgs(IPipeContext context) + { + return DeliveryArgsFunc?.Invoke(context); + } + + protected virtual string GetDeadLetterExchangeName(TimeSpan retryIn) + { + return Conventions.RetryLaterExchangeConvention(retryIn); + } + + protected virtual TimeSpan GetRetryTimeSpan(IPipeContext context) + { + return (GetMessageAcknowledgement(context) as Retry)?.Span ?? new TimeSpan(-1); + } + + protected virtual void UpdateRetryHeaders(BasicDeliverEventArgs args, IPipeContext context) + { + var retryInfo = context.GetRetryInformation(); + _headerUpdater.AddOrUpdate(args, retryInfo); + } + } +} diff --git a/src/RawRabbit.Enrichers.RetryLater/RawRabbit.Enrichers.RetryLater.csproj b/src/RawRabbit.Enrichers.RetryLater/RawRabbit.Enrichers.RetryLater.csproj new file mode 100644 index 00000000..5687158b --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/RawRabbit.Enrichers.RetryLater.csproj @@ -0,0 +1,33 @@ + + + + Enricher RawRabbit with a Retry Later capabilities + RawRabbit.Enrichers.RetryLater + 2.0.0 + pardahlman + netstandard1.5;net451 + RawRabbit.Enrichers.RetryLater + RawRabbit.Enrichers.RetryLater + rabbitmq;rawrabbit;enrichers;retry-later + http://pardahlman.se/raw/icon.png + https://github.com/pardahlman/RawRabbit + false + false + false + RawRabbit + + + + TRACE;DEBUG;NETSTANDARD1_5 + + + + + + + + + + + + diff --git a/src/RawRabbit.Enrichers.RetryLater/RetryLaterPlugin.cs b/src/RawRabbit.Enrichers.RetryLater/RetryLaterPlugin.cs new file mode 100644 index 00000000..2bf42abf --- /dev/null +++ b/src/RawRabbit.Enrichers.RetryLater/RetryLaterPlugin.cs @@ -0,0 +1,22 @@ +using RawRabbit.Common; +using RawRabbit.Instantiation; +using RawRabbit.Middleware; + +namespace RawRabbit +{ + public static class RetryLaterPlugin + { + public static IClientBuilder UseRetryLater(this IClientBuilder builder) + { + builder.Register( + pipe => pipe + .Use() + .Use(), + ioc => ioc + .AddSingleton() + .AddSingleton() + ); + return builder; + } + } +} diff --git a/src/RawRabbit.Operations.Respond/Acknowledgement/Respond.cs b/src/RawRabbit.Operations.Respond/Acknowledgement/Respond.cs index cff4491a..90540e34 100644 --- a/src/RawRabbit.Operations.Respond/Acknowledgement/Respond.cs +++ b/src/RawRabbit.Operations.Respond/Acknowledgement/Respond.cs @@ -18,13 +18,5 @@ public static Reject Reject(bool requeue = true) { return new Reject(requeue); } - - public static Retry Retry(TimeSpan span) - { - return new Retry - { - Span = span - }; - } } } diff --git a/src/RawRabbit.Operations.Respond/Acknowledgement/Retry.cs b/src/RawRabbit.Operations.Respond/Acknowledgement/Retry.cs deleted file mode 100644 index f6c93ef1..00000000 --- a/src/RawRabbit.Operations.Respond/Acknowledgement/Retry.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System; -using RawRabbit.Common; - -namespace RawRabbit.Operations.Respond.Acknowledgement -{ - public class Retry : TypedAcknowlegement - { - public TimeSpan Span { get; set; } - - public static Retry In(TimeSpan span) - { - return new Retry { Span = span }; - } - - public override Common.Acknowledgement AsUntyped() - { - return new Retry(Span); - } - } -} diff --git a/src/RawRabbit/Common/Acknowledgement.cs b/src/RawRabbit/Common/Acknowledgement.cs index 2ac0b292..0dfc3df9 100644 --- a/src/RawRabbit/Common/Acknowledgement.cs +++ b/src/RawRabbit/Common/Acknowledgement.cs @@ -1,6 +1,4 @@ -using System; - -namespace RawRabbit.Common +namespace RawRabbit.Common { public abstract class Acknowledgement { } @@ -26,19 +24,4 @@ public Reject(bool requeue = true) Requeue = requeue; } } - - public class Retry : Acknowledgement - { - public TimeSpan Span { get; set; } - - public Retry(TimeSpan span) - { - Span = span; - } - - public static Retry In(TimeSpan span) - { - return new Retry(span); - } - } } diff --git a/src/RawRabbit/Common/PropertyHeaders.cs b/src/RawRabbit/Common/PropertyHeaders.cs index 9498ffe7..9a976785 100644 --- a/src/RawRabbit/Common/PropertyHeaders.cs +++ b/src/RawRabbit/Common/PropertyHeaders.cs @@ -7,7 +7,6 @@ public class PropertyHeaders public static readonly string Host = "host"; public static readonly string ApproximateRetry = "approx_retry"; public static readonly string Death = "x-death"; - public static readonly string RetryCount = "retry_count"; public static readonly string Context = "message_context"; public static readonly string ExceptionHeader = "exception"; public static readonly string ExceptionType = "exception_type"; diff --git a/src/RawRabbit/Logging/LibLog.cs b/src/RawRabbit/Logging/LibLog.cs index 6d7e845b..56f5e531 100644 --- a/src/RawRabbit/Logging/LibLog.cs +++ b/src/RawRabbit/Logging/LibLog.cs @@ -39,7 +39,17 @@ #pragma warning disable 1591 using System.Diagnostics.CodeAnalysis; - +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("RawRabbit.Enrichers.RetryLater")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Get")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.MessageSequence")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Publish")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Request")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Respond")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.StateMachine")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Subscribe")] +[assembly: InternalsVisibleTo("RawRabbit.Operations.Tools")] [assembly: SuppressMessage("Microsoft.Design", "CA1020:AvoidNamespacesWithFewTypes", Scope = "namespace", Target = "RawRabbit.Logging")] [assembly: SuppressMessage("Microsoft.Design", "CA1026:DefaultParametersShouldNotBeUsed", Scope = "member", Target = "RawRabbit.Logging.Logger.#Invoke(RawRabbit.Logging.LogLevel,System.Func`1,System.Exception,System.Object[])")] diff --git a/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs b/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs index 4f8f3b9e..cbd28561 100644 --- a/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs +++ b/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs @@ -108,11 +108,6 @@ protected virtual async Task AcknowledgeMessageAsync(IPipeConte HandleReject(ack as Reject, channel, deliveryArgs); return ack; } - if (ack is Retry) - { - await HandleRetryAsync(ack as Retry, channel, deliveryArgs); - return ack; - } throw new NotSupportedException($"Unable to handle {ack.GetType()} as an Acknowledgement."); } @@ -132,37 +127,6 @@ protected virtual void HandleReject(Reject reject, IModel channel, BasicDeliverE channel.BasicReject(deliveryArgs.DeliveryTag, reject.Requeue); } - protected virtual async Task HandleRetryAsync(Retry retry, IModel channel, BasicDeliverEventArgs deliveryArgs) - { - channel.BasicAck(deliveryArgs.DeliveryTag, false); - - var deadLeterExchangeName = Conventions.RetryLaterExchangeConvention(retry.Span); - var deadLetterQueueName = Conventions.RetryLaterQueueNameConvetion(deliveryArgs.Exchange, retry.Span); - await Topology.DeclareExchangeAsync(new ExchangeDeclaration - { - Name = deadLeterExchangeName, - Durable = true, - ExchangeType = ExchangeType.Direct - }); - await Topology.DeclareQueueAsync(new QueueDeclaration - { - Name = deadLetterQueueName, - Durable = true, - Arguments = new Dictionary - { - {QueueArgument.DeadLetterExchange, deliveryArgs.Exchange}, - {QueueArgument.Expires, Convert.ToInt32(retry.Span.Add(TimeSpan.FromSeconds(1)).TotalMilliseconds)}, - {QueueArgument.MessageTtl, Convert.ToInt32(retry.Span.TotalMilliseconds)} - } - }); - await Topology.BindQueueAsync(deadLetterQueueName, deadLeterExchangeName, deliveryArgs.RoutingKey); - using (var publishChannel = await ChannelFactory.CreateChannelAsync()) - { - publishChannel.BasicPublish(deadLeterExchangeName, deliveryArgs.RoutingKey, deliveryArgs.BasicProperties, deliveryArgs.Body); - } - await Topology.UnbindQueueAsync(deadLetterQueueName, deadLeterExchangeName, deliveryArgs.RoutingKey); - } - protected virtual bool GetAutoAck(IPipeContext context) { return AutoAckFunc(context); diff --git a/test/RawRabbit.IntegrationTests/Enrichers/AttributeEnricherTests.cs b/test/RawRabbit.IntegrationTests/Enrichers/AttributeEnricherTests.cs index 68276d86..2b1713e5 100644 --- a/test/RawRabbit.IntegrationTests/Enrichers/AttributeEnricherTests.cs +++ b/test/RawRabbit.IntegrationTests/Enrichers/AttributeEnricherTests.cs @@ -1,11 +1,7 @@ using System.Threading.Tasks; using RawRabbit.Configuration.Exchange; using RawRabbit.Enrichers.Attributes; -using RawRabbit.Enrichers.Attributes.Middleware; using RawRabbit.Instantiation; -using RawRabbit.Operations.Request.Core; -using RawRabbit.Operations.Respond.Core; -using RawRabbit.Pipe; using Xunit; namespace RawRabbit.IntegrationTests.Enrichers diff --git a/test/RawRabbit.IntegrationTests/Enrichers/RetryLaterEnricherTests.cs b/test/RawRabbit.IntegrationTests/Enrichers/RetryLaterEnricherTests.cs new file mode 100644 index 00000000..098ee400 --- /dev/null +++ b/test/RawRabbit.IntegrationTests/Enrichers/RetryLaterEnricherTests.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading.Tasks; +using RawRabbit.Common; +using RawRabbit.Enrichers.MessageContext.Subscribe; +using RawRabbit.IntegrationTests.TestMessages; +using Xunit; + +namespace RawRabbit.IntegrationTests.Enrichers +{ + public class RetryLaterEnricherTests + { + [Fact] + public async Task Should_Update_Retry_Information_Correctly() + { + using (var publisher = RawRabbitFactory.CreateTestClient()) + using (var subscriber = RawRabbitFactory.CreateTestClient(p => p.UseRetryLater())) + { + /* Setup */ + var firstTcs = new TaskCompletionSource(); + var secondTcs = new TaskCompletionSource(); + var thirdTcs = new TaskCompletionSource(); + await subscriber.SubscribeAsync(async (message, context) => + { + if (!firstTcs.Task.IsCompleted) + { + firstTcs.TrySetResult(context); + return Retry.In(TimeSpan.FromMilliseconds(200)); + } + if (!secondTcs.Task.IsCompleted) + { + secondTcs.TrySetResult(context); + return Retry.In(TimeSpan.FromMilliseconds(200)); + } + thirdTcs.TrySetResult(context); + return new Ack(); + }, ctx => ctx.UseMessageContext(c => new RetryMessageContext { RetryInfo = c.GetRetryInformation()})); + + /* Test */ + await publisher.PublishAsync(new BasicMessage()); + await thirdTcs.Task; + + /* Assert */ + Assert.Equal(0, firstTcs.Task.Result.RetryInfo.NumberOfRetries); + Assert.Equal(1, secondTcs.Task.Result.RetryInfo.NumberOfRetries); + Assert.Equal(2, thirdTcs.Task.Result.RetryInfo.NumberOfRetries); + Assert.Equal(secondTcs.Task.Result.RetryInfo.OriginalDelivered, thirdTcs.Task.Result.RetryInfo.OriginalDelivered); + } + } + + internal class RetryMessageContext + { + public RetryInformation RetryInfo { get; set; } + } + } +} diff --git a/test/RawRabbit.IntegrationTests/PublishAndSubscribe/AcknowledgementSubscribeTests.cs b/test/RawRabbit.IntegrationTests/PublishAndSubscribe/AcknowledgementSubscribeTests.cs index c0c14f93..373ec79e 100644 --- a/test/RawRabbit.IntegrationTests/PublishAndSubscribe/AcknowledgementSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/PublishAndSubscribe/AcknowledgementSubscribeTests.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using RawRabbit.Common; using RawRabbit.Enrichers.MessageContext.Context; +using RawRabbit.Instantiation; using RawRabbit.IntegrationTests.TestMessages; using RawRabbit.IntegrationTests.TestMessages.Extras; using Xunit; @@ -277,7 +278,7 @@ await secondSubscriber.SubscribeAsync(async (recie public async Task Should_Be_Able_To_Return_Retry() { using (var publisher = RawRabbitFactory.CreateTestClient()) - using (var firstSubscriber = RawRabbitFactory.CreateTestClient()) + using (var firstSubscriber = RawRabbitFactory.CreateTestClient(new RawRabbitOptions { Plugins = p => p.UseRetryLater()})) using (var secondSubscriber = RawRabbitFactory.CreateTestClient()) { /* Setup */ @@ -308,7 +309,7 @@ await secondSubscriber.SubscribeAsync(async recieved => public async Task Should_Be_Able_To_Return_Retry_From_Subscriber_With_Context() { using (var publisher = RawRabbitFactory.CreateTestClient()) - using (var firstSubscriber = RawRabbitFactory.CreateTestClient()) + using (var firstSubscriber = RawRabbitFactory.CreateTestClient(p => p.UseRetryLater())) using (var secondSubscriber = RawRabbitFactory.CreateTestClient()) { /* Setup */ @@ -339,7 +340,7 @@ await secondSubscriber.SubscribeAsync(async (recie public async Task Should_Be_Able_To_Retry_Multiple_Times() { using (var publisher = RawRabbitFactory.CreateTestClient()) - using (var subscriber = RawRabbitFactory.CreateTestClient()) + using (var subscriber = RawRabbitFactory.CreateTestClient(p => p.UseRetryLater())) { /* Setup */ var firstTsc = new TaskCompletionSource(); @@ -374,7 +375,7 @@ await subscriber.SubscribeAsync(async recieved => public async Task Should_Handle_Concurrent_Retries() { using (var publisher = RawRabbitFactory.CreateTestClient()) - using (var subscriber = RawRabbitFactory.CreateTestClient()) + using (var subscriber = RawRabbitFactory.CreateTestClient(p => p.UseRetryLater())) { /* Setup */ var firstTsc = new TaskCompletionSource(); diff --git a/test/RawRabbit.IntegrationTests/RawRabbit.IntegrationTests.csproj b/test/RawRabbit.IntegrationTests/RawRabbit.IntegrationTests.csproj index 86b5db9b..9822c726 100644 --- a/test/RawRabbit.IntegrationTests/RawRabbit.IntegrationTests.csproj +++ b/test/RawRabbit.IntegrationTests/RawRabbit.IntegrationTests.csproj @@ -13,6 +13,7 @@ + diff --git a/test/RawRabbit.IntegrationTests/RawRabbitFactory.cs b/test/RawRabbit.IntegrationTests/RawRabbitFactory.cs index c19e0f6e..c9a7c5af 100644 --- a/test/RawRabbit.IntegrationTests/RawRabbitFactory.cs +++ b/test/RawRabbit.IntegrationTests/RawRabbitFactory.cs @@ -1,10 +1,16 @@ -using RawRabbit.Configuration; +using System; +using RawRabbit.Configuration; using RawRabbit.Instantiation; namespace RawRabbit.IntegrationTests { public static class RawRabbitFactory { + public static Instantiation.Disposable.BusClient CreateTestClient(Action plugins) + { + return CreateTestClient(new RawRabbitOptions {Plugins = plugins}); + } + public static Instantiation.Disposable.BusClient CreateTestClient(RawRabbitOptions options = null) { return Instantiation.RawRabbitFactory.CreateSingleton(GetTestOptions(options));