Skip to content

Commit

Permalink
(#224) Use unique key for consumer cache
Browse files Browse the repository at this point in the history
The cache only looked at the routing key and did not
verify that the queue name matched the cache
  • Loading branch information
pardahlman committed May 7, 2017
1 parent e6a0dfa commit d2032f2
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/RawRabbit/Consumer/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ namespace RawRabbit.Consumer
public class ConsumerFactory : IConsumerFactory
{
private readonly IChannelFactory _channelFactory;
private readonly ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>> NoAckConsumers;
private readonly ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>> AckConsumers;
private readonly ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>> _consumerCache;
private readonly ILogger _logger = LogManager.GetLogger<ConsumerFactory>();

public ConsumerFactory(IChannelFactory channelFactory)
{
NoAckConsumers = new ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>>();
AckConsumers = new ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>>();
_consumerCache = new ConcurrentDictionary<string, Lazy<Task<IBasicConsumer>>>();
_channelFactory = channelFactory;
}

public Task<IBasicConsumer> GetConsumerAsync(ConsumeConfiguration cfg, IModel channel = null, CancellationToken token = default(CancellationToken))
{
var cache = cfg.NoAck ? NoAckConsumers : AckConsumers;
var lazyConsumerTask = cache.GetOrAdd(cfg.RoutingKey, routingKey =>
var consumerKey = CreateConsumerKey(cfg);
var lazyConsumerTask = _consumerCache.GetOrAdd(consumerKey, routingKey =>
{
return new Lazy<Task<IBasicConsumer>>(async () =>
{
Expand Down Expand Up @@ -95,6 +93,11 @@ protected virtual void CheckPropertyValues(ConsumeConfiguration cfg)
_logger.LogInformation("Creating a dedicated channel for consumer.");
return _channelFactory.CreateChannelAsync(token);
}

protected string CreateConsumerKey(ConsumeConfiguration cfg)
{
return $"{cfg.QueueName}:{cfg.RoutingKey}:{cfg.NoAck}";
}
}

public static class ConsumerExtensions
Expand Down

0 comments on commit d2032f2

Please sign in to comment.