diff --git a/src/RawRabbit/Operations/Requester.cs b/src/RawRabbit/Operations/Requester.cs index da13f4af..9cec815f 100644 --- a/src/RawRabbit/Operations/Requester.cs +++ b/src/RawRabbit/Operations/Requester.cs @@ -164,28 +164,31 @@ private Task GetOrCreateConsumerAsync(IConsumerConfiguration cfg) .Unwrap(); } - private void WireUpConsumer(IRawConsumer consumer, IConsumerConfiguration cfg) + private void WireUpConsumer(IRawConsumer consumer, IConsumerConfiguration cfg) { - ResponseCompletionSource responseTcs = null; - consumer.OnMessageAsync = (o, args) => _errorStrategy.ExecuteAsync(() => + consumer.OnMessageAsync = (o, args) => { - if (_responseDictionary.TryRemove(args.BasicProperties.CorrelationId, out responseTcs)) + ResponseCompletionSource responseTcs = null; + return _errorStrategy.ExecuteAsync(() => { - _logger.LogDebug($"Recived response with correlationId {args.BasicProperties.CorrelationId}."); - responseTcs.RequestTimer.Dispose(); - - _errorStrategy.OnResponseRecievedAsync(args, responseTcs); - if (responseTcs.Task.IsFaulted) + if (_responseDictionary.TryRemove(args.BasicProperties.CorrelationId, out responseTcs)) { + _logger.LogDebug($"Recived response with correlationId {args.BasicProperties.CorrelationId}."); + responseTcs.RequestTimer.Dispose(); + + _errorStrategy.OnResponseRecievedAsync(args, responseTcs); + if (responseTcs.Task.IsFaulted) + { + return _completed; + } + var response = _serializer.Deserialize(args); + responseTcs.TrySetResult(response); return _completed; } - var response = _serializer.Deserialize(args); - responseTcs.TrySetResult(response); + _logger.LogWarning($"Unable to find callback for {args.BasicProperties.CorrelationId}."); return _completed; - } - _logger.LogWarning($"Unable to find callback for {args.BasicProperties.CorrelationId}."); - return _completed; - }, exception => _errorStrategy.OnResponseRecievedException(consumer, cfg, args, responseTcs, exception)); + }, exception => _errorStrategy.OnResponseRecievedException(consumer, cfg, args, responseTcs, exception)); + }; } private class ResponseCompletionSource : TaskCompletionSource