Skip to content

Commit

Permalink
(#263) Add RetryLater enricher
Browse files Browse the repository at this point in the history
  • Loading branch information
pardahlman committed Nov 19, 2017
1 parent 7a8ec04 commit 35ecde7
Show file tree
Hide file tree
Showing 23 changed files with 493 additions and 95 deletions.
9 changes: 8 additions & 1 deletion RawRabbit.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.16
VisualStudioVersion = 15.0.27004.2006
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}"
EndProject
Expand Down Expand Up @@ -72,6 +72,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawRabbit.Enrichers.Protobu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RawRabbit.Enrichers.Polly.Tests", "test\RawRabbit.Enrichers.Polly.Tests\RawRabbit.Enrichers.Polly.Tests.csproj", "{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RawRabbit.Enrichers.RetryLater", "src\RawRabbit.Enrichers.RetryLater\RawRabbit.Enrichers.RetryLater.csproj", "{E1816B3D-9C4B-4D08-9537-ACB6806AF690}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -194,6 +196,10 @@ Global
{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15}.Release|Any CPU.Build.0 = Release|Any CPU
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E1816B3D-9C4B-4D08-9537-ACB6806AF690}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -228,6 +234,7 @@ Global
{CF308330-735E-411D-BBA9-0018DD079AF1} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
{8D45F8AC-B65F-4A2B-9153-8A7F3D423575} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
{4B4C5936-D61E-4FD8-AEB7-154CEAF84E15} = {2F91E22A-AEBA-4BEF-9A03-C8232830F697}
{E1816B3D-9C4B-4D08-9537-ACB6806AF690} = {7FCF8D3B-BA55-4C47-AC60-5CEF75418BEB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6EC93B92-1319-44D3-A596-9FBD9BD23050}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace RawRabbit
public static class SubscribeMessageContextExtension
{
public static readonly Action<IPipeBuilder> ConsumePipe = consume => consume
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(MessageContextSubscibeStage.MessageRecieved))
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageRecieved))
.Use<HeaderDeserializationMiddleware>(new HeaderDeserializationOptions
{
HeaderKeyFunc = c => PropertyHeaders.Context,
Expand Down
19 changes: 19 additions & 0 deletions src/RawRabbit.Enrichers.RetryLater/Common/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace RawRabbit.Common
{
public class Retry : Acknowledgement
{
public TimeSpan Span { get; set; }

public Retry(TimeSpan span)
{
Span = span;
}

public static Retry In(TimeSpan span)
{
return new Retry(span);
}
}
}
8 changes: 8 additions & 0 deletions src/RawRabbit.Enrichers.RetryLater/Common/RetryHeaders.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace RawRabbit.Common
{
public class RetryHeaders
{
public const string NumberOfRetries = "x-number-of-retries";
public const string OriginalDelivered = "x-original-delivered";
}
}
10 changes: 10 additions & 0 deletions src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;

namespace RawRabbit.Common
{
public class RetryInformation
{
public int NumberOfRetries { get; set; }
public DateTime OriginalDelivered { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client.Events;

namespace RawRabbit.Common
{
public interface IRetryInformationHeaderUpdater
{
void AddOrUpdate(BasicDeliverEventArgs args);
void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo);
}

public class RetryInformationHeaderUpdater : IRetryInformationHeaderUpdater
{
public void AddOrUpdate(BasicDeliverEventArgs args)
{
TryAddOriginalDelivered(args, DateTime.UtcNow);
AddOrUpdateNumberOfRetries(args);
}

public void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo)
{
TryAddOriginalDelivered(args, retryInfo.OriginalDelivered);
AddOrUpdateNumberOfRetries(args);
}

private void AddOrUpdateNumberOfRetries(BasicDeliverEventArgs args)
{
var currentRetry = 0;
if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.NumberOfRetries))
{
var valueStr = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries);
currentRetry = int.Parse(valueStr);
args.BasicProperties.Headers.Remove(RetryHeaders.NumberOfRetries);
}
var nextRetry = (++currentRetry).ToString();
args.BasicProperties.Headers.Add(RetryHeaders.NumberOfRetries, nextRetry);
}

private static void TryAddOriginalDelivered(BasicDeliverEventArgs args, DateTime originalDelivered)
{
if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.OriginalDelivered))
{
return;
}
args.BasicProperties.Headers.Add(RetryHeaders.OriginalDelivered, originalDelivered.ToString("u"));
}

private static string GetHeaderString(IDictionary<string, object> headers, string key)
{
if (headers == null)
{
return null;
}
if (!headers.ContainsKey(key))
{
return null;
}
if (!(headers[key] is byte[] headerBytes))
{
return null;
}

var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes);
return headerStr;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client.Events;

namespace RawRabbit.Common
{
public interface IRetryInformationProvider
{
RetryInformation Get(BasicDeliverEventArgs args);
}

public class RetryInformationProvider : IRetryInformationProvider
{
public RetryInformation Get(BasicDeliverEventArgs args)
{
return new RetryInformation
{
NumberOfRetries = ExtractNumberOfRetries(args),
OriginalDelivered = ExtractOriginalDelivered(args)
};
}

private DateTime ExtractOriginalDelivered(BasicDeliverEventArgs args)
{
var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.OriginalDelivered);
return DateTime.TryParse(headerValue, out var originalSent) ? originalSent : DateTime.UtcNow;
}

private int ExtractNumberOfRetries(BasicDeliverEventArgs args)
{
var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries);
return int.TryParse(headerValue, out var noOfRetries) ? noOfRetries : 0;
}

private static string GetHeaderString(IDictionary<string, object> headers, string key)
{
if (headers == null)
{
return null;
}
if (!headers.ContainsKey(key))
{
return null;
}
if (!(headers[key] is byte[] headerBytes))
{
return null;
}

var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes);
return headerStr;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using RawRabbit.Pipe;

namespace RawRabbit.Common
{
public static class RetryLaterPipeContextExtensions
{
private const string RetryInformationKey = "RetryInformation";

internal static IPipeContext AddRetryInformation(this IPipeContext context, RetryInformation retryInformation)
{
context.Properties.TryAdd(RetryInformationKey, retryInformation);
return context;
}

public static RetryInformation GetRetryInformation(this IPipeContext context)
{
return context.Get<RetryInformation>(RetryInformationKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RawRabbit.Common;
using RawRabbit.Pipe;
using RawRabbit.Pipe.Middleware;

namespace RawRabbit.Middleware
{
public class RetryInformationExtractionOptions
{
public Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc { get; set; }
}

public class RetryInformationExtractionMiddleware : StagedMiddleware
{
private readonly IRetryInformationProvider _retryProvider;
protected Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc;
public override string StageMarker => Pipe.StageMarker.MessageRecieved;

public RetryInformationExtractionMiddleware(IRetryInformationProvider retryProvider, RetryInformationExtractionOptions options = null)
{
_retryProvider = retryProvider;
DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs());
}

public override Task InvokeAsync(IPipeContext context, CancellationToken token = default(CancellationToken))
{
var retryInfo = GetRetryInformation(context);
AddToPipeContext(context, retryInfo);
return Next.InvokeAsync(context, token);
}

protected virtual BasicDeliverEventArgs GetDeliveryEventArgs(IPipeContext context)
{
return DeliveryArgsFunc?.Invoke(context);
}

protected virtual RetryInformation GetRetryInformation(IPipeContext context)
{
var devlieryArgs = GetDeliveryEventArgs(context);
return _retryProvider.Get(devlieryArgs);
}

protected virtual void AddToPipeContext(IPipeContext context, RetryInformation retryInfo)
{
context.AddRetryInformation(retryInfo);
}
}
}
Loading

0 comments on commit 35ecde7

Please sign in to comment.