Skip to content

Commit

Permalink
(#79) Move TaskCompletionSource to own scpoe
Browse files Browse the repository at this point in the history
  • Loading branch information
par.dahlman committed Apr 14, 2016
1 parent ffe8b4a commit 2e00c3f
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions src/RawRabbit/Operations/Requester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,31 @@ private Task<IRawConsumer> GetOrCreateConsumerAsync(IConsumerConfiguration cfg)
.Unwrap();
}

private void WireUpConsumer(IRawConsumer consumer, IConsumerConfiguration cfg)
private void WireUpConsumer(IRawConsumer consumer, IConsumerConfiguration cfg)
{
ResponseCompletionSource responseTcs = null;
consumer.OnMessageAsync = (o, args) => _errorStrategy.ExecuteAsync(() =>
consumer.OnMessageAsync = (o, args) =>
{
if (_responseDictionary.TryRemove(args.BasicProperties.CorrelationId, out responseTcs))
ResponseCompletionSource responseTcs = null;
return _errorStrategy.ExecuteAsync(() =>
{
_logger.LogDebug($"Recived response with correlationId {args.BasicProperties.CorrelationId}.");
responseTcs.RequestTimer.Dispose();

_errorStrategy.OnResponseRecievedAsync(args, responseTcs);
if (responseTcs.Task.IsFaulted)
if (_responseDictionary.TryRemove(args.BasicProperties.CorrelationId, out responseTcs))
{
_logger.LogDebug($"Recived response with correlationId {args.BasicProperties.CorrelationId}.");
responseTcs.RequestTimer.Dispose();

_errorStrategy.OnResponseRecievedAsync(args, responseTcs);
if (responseTcs.Task.IsFaulted)
{
return _completed;
}
var response = _serializer.Deserialize(args);
responseTcs.TrySetResult(response);
return _completed;
}
var response = _serializer.Deserialize(args);
responseTcs.TrySetResult(response);
_logger.LogWarning($"Unable to find callback for {args.BasicProperties.CorrelationId}.");
return _completed;
}
_logger.LogWarning($"Unable to find callback for {args.BasicProperties.CorrelationId}.");
return _completed;
}, exception => _errorStrategy.OnResponseRecievedException(consumer, cfg, args, responseTcs, exception));
}, exception => _errorStrategy.OnResponseRecievedException(consumer, cfg, args, responseTcs, exception));
};
}

private class ResponseCompletionSource : TaskCompletionSource<object>
Expand Down

0 comments on commit 2e00c3f

Please sign in to comment.