diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..9ab7418a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,31 @@ +# editorconfig.org + +# top-most EditorConfig file +root = true + +# Default settings: +# A newline ending every file +# Use 4 spaces as indentation +[*] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = tab +indent_size = 4 + +# Xml project files +[*.{csproj,vcxproj,vcxproj.filters,proj,nativeproj,locproj}] +indent_size = 2 + +# Xml files +[*.{xml,stylecop,resx,ruleset}] +indent_size = 2 + +# Xml config files +[*.{props,targets,config,nuspec}] +indent_size = 2 + +# Shell scripts +[*.sh] +end_of_line = lf +[*.{cmd, bat}] +end_of_line = crlf diff --git a/.gitignore b/.gitignore index 28100584..c962a5d7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,9 @@ docs/*build*/ #VS Code .vscode/** +#Rider +.idea/** + # User-specific files (MonoDevelop/Xamarin Studio) *.userprefs **ncrunch** diff --git a/README.md b/README.md index 61054691..38042b77 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # RawRabbit [![Build Status](https://img.shields.io/appveyor/ci/pardahlman/rawrabbit.svg?style=flat-square)](https://ci.appveyor.com/project/pardahlman/rawrabbit) [![Documentation Status](https://readthedocs.org/projects/rawrabbit/badge/?version=latest&style=flat-square)](http://rawrabbit.readthedocs.org/) [![NuGet](https://img.shields.io/nuget/v/RawRabbit.svg?style=flat-square)](https://www.nuget.org/packages/RawRabbit) [![GitHub release](https://img.shields.io/github/release/pardahlman/rawrabbit.svg?style=flat-square)](https://github.com/pardahlman/rawrabbit/releases/latest) +[![Slack Status](https://rawrabbit.herokuapp.com/badge.svg)](https://rawrabbit.herokuapp.com) ## Quick introduction `RawRabbit` is a modern .NET client for communication over [RabbitMq](http://rabbitmq.com/). It is written for [`.NET Core`](http://dot.net) and uses Microsoft’s new frameworks for [logging](https://github.com/aspnet/Logging), [configuration](https://github.com/aspnet/Configuration) and [dependecy injection](https://github.com/aspnet/DependencyInjection). Full documentation available at [`rawrabbit.readthedocs.org`](http://rawrabbit.readthedocs.org/). @@ -17,8 +18,8 @@ client.SubscribeAsync(async (msg, context) => await client.PublishAsync(new BasicMessage { Prop = "Hello, world!"}); ``` -### Request/Respond -`RawRabbits` request/respond (`RPC`) implementation uses the [direct reply-to feature](https://www.rabbitmq.com/direct-reply-to.html) for better performance and lower resource allocation. +### Request/Response +`RawRabbits` request/response (`RPC`) implementation uses the [direct reply-to feature](https://www.rabbitmq.com/direct-reply-to.html) for better performance and lower resource allocation. ```csharp var client = BusClientFactory.CreateDefault(); client.RespondAsync(async (request, context) => diff --git a/RELEASENOTES.md b/RELEASENOTES.md index 02703f39..c7eebd62 100644 --- a/RELEASENOTES.md +++ b/RELEASENOTES.md @@ -1,3 +1,21 @@ +# 1.10.3 + + - [#163](https://github.com/pardahlman/RawRabbit/pull/163) - Added queue assume initialized and tests contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#162](https://github.com/pardahlman/RawRabbit/pull/162) - Converted tabs to 4 spaces contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#161](https://github.com/pardahlman/RawRabbit/pull/161) - added editorconfig for uniform editing contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#160](https://github.com/pardahlman/RawRabbit/issues/160) - Add"AssumeInitialized" functionality for queues + - [#159](https://github.com/pardahlman/RawRabbit/pull/159) - Added default broker connection values for RawRabbitConfig class contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#150](https://github.com/pardahlman/RawRabbit/issues/150) - StackOverflowException occures when subscribeMethod throws an exception using dotnet core + - [#143](https://github.com/pardahlman/RawRabbit/issues/143) - Failure Recovery Issue with PublishAsync + - [#142](https://github.com/pardahlman/RawRabbit/issues/142) - Failure Recovery + - [#140](https://github.com/pardahlman/RawRabbit/pull/140) - (#129) Expose Mandatory Option For Publish contributed by Richard Tasker ([ritasker](https://github.com/ritasker)) + - [#136](https://github.com/pardahlman/RawRabbit/issues/136) - Newtonsoft.Json.JsonSerializationException: Error getting value from 'ScopeId' on 'System.Net.IPAddress'. + - [#132](https://github.com/pardahlman/RawRabbit/issues/132) - Default connection timeout + - [#129](https://github.com/pardahlman/RawRabbit/issues/129) - Expose mandatory option for publish + - [#116](https://github.com/pardahlman/RawRabbit/issues/116) - Unable to publish message to default error exchange. + +Commits: 4b6e57e351...52573f4164 + # 1.10.2 - [#117](https://github.com/pardahlman/RawRabbit/issues/117) - Propegate Topology Exception for Consumers @@ -6,7 +24,6 @@ Commits: af0bda979e...331e174a02 - # 1.10.1 - [#108](https://github.com/pardahlman/RawRabbit/issues/108) - Upgrade to RabbitMQ.Client 4.1.0 diff --git a/docs/Getting-started.md b/docs/Getting-started.md index a7e2a281..0fe4c684 100644 --- a/docs/Getting-started.md +++ b/docs/Getting-started.md @@ -12,8 +12,8 @@ Install the latest version of [`RawRabbit`](https://www.nuget.org/packages/RawRa ``` The `vNext` package contains the convenience class `BusClientFactory` that can be used to create a default instance of the `RawRabbit` client. It makes life easier, but is not necesary. -## Creating instanse -Depending on the scenario, there are a few different ways to instansiate the `RawRabbit` client. The methods described below all have optional arguments for registering specific subdependeices. +## Creating instance +Depending on the scenario, there are a few different ways to instantiate the `RawRabbit` client. The methods described below all have optional arguments for registering specific subdependeices. ### vNext Application wire-up If the application is bootstrapped from a `vNext` application, the dependecies and client can be registed by using the `AddRawRabbit` extension for `IServiceCollection` @@ -52,7 +52,7 @@ kernel.RegisterRawRabbit("guest:guest@localhost:5672/"); ``` ## Broker connection -As soon as the client is instansiated, it will try to connect to the broker. By default `RawRabbit` will try to connect to `localhost`. Configuration can be provided in different ways. +As soon as the client is instantiated, it will try to connect to the broker. By default `RawRabbit` will try to connect to `localhost`. Configuration can be provided in different ways. ### Configuration object The main configuration object for `RawRabbit` is `RawRabbitConfiguration`. @@ -109,4 +109,4 @@ var bulk = client.GetMessages(cfg => cfg .GetAll() .WithNoAck() )); -``` \ No newline at end of file +``` diff --git a/sample/RawRabbit.Messages.Sample/project.json b/sample/RawRabbit.Messages.Sample/project.json index 1d6031b6..f06c651b 100644 --- a/sample/RawRabbit.Messages.Sample/project.json +++ b/sample/RawRabbit.Messages.Sample/project.json @@ -7,6 +7,6 @@ "frameworks": { "netstandard1.5": {}, - "net451": {}, + "net451": {} } } diff --git a/src/RawRabbit.Attributes/project.json b/src/RawRabbit.Attributes/project.json index 1eff72c9..36c1ac46 100644 --- a/src/RawRabbit.Attributes/project.json +++ b/src/RawRabbit.Attributes/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Attributes", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman" ], "description": "Configure messages and topology with attributes.", "packOptions": { diff --git a/src/RawRabbit.DependencyInjection.Autofac/project.json b/src/RawRabbit.DependencyInjection.Autofac/project.json index 3c9a7c2a..1983357e 100644 --- a/src/RawRabbit.DependencyInjection.Autofac/project.json +++ b/src/RawRabbit.DependencyInjection.Autofac/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.DependencyInjection.Autofac", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Wire up RawRabbit with Autfac!", diff --git a/src/RawRabbit.DependencyInjection.Ninject/project.json b/src/RawRabbit.DependencyInjection.Ninject/project.json index 069c1932..a892c4b3 100644 --- a/src/RawRabbit.DependencyInjection.Ninject/project.json +++ b/src/RawRabbit.DependencyInjection.Ninject/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.DependencyInjection.Ninject", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman", "Joshua Barron" ], "description": "Wire up RawRabbit with Ninject!", diff --git a/src/RawRabbit.Extensions/project.json b/src/RawRabbit.Extensions/project.json index 37f70b3f..20637d5b 100644 --- a/src/RawRabbit.Extensions/project.json +++ b/src/RawRabbit.Extensions/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Extensions", - "version": "1.10.2-*", + "version": "1.10.3-*", "description": "Make the most of RawRabbit with these extensions", "authors": [ "par.dahlman" ], "packOptions": { diff --git a/src/RawRabbit.Logging.Log4Net/project.json b/src/RawRabbit.Logging.Log4Net/project.json index 169073d7..fcff4fe5 100644 --- a/src/RawRabbit.Logging.Log4Net/project.json +++ b/src/RawRabbit.Logging.Log4Net/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.Log4Net", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with Log4Net", diff --git a/src/RawRabbit.Logging.NLog/project.json b/src/RawRabbit.Logging.NLog/project.json index de9d5d74..a8022ce0 100644 --- a/src/RawRabbit.Logging.NLog/project.json +++ b/src/RawRabbit.Logging.NLog/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.NLog", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with NLog", diff --git a/src/RawRabbit.Logging.Serilog/project.json b/src/RawRabbit.Logging.Serilog/project.json index a55a715a..ca787614 100644 --- a/src/RawRabbit.Logging.Serilog/project.json +++ b/src/RawRabbit.Logging.Serilog/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.Serilog", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with Serilog", diff --git a/src/RawRabbit.vNext/project.json b/src/RawRabbit.vNext/project.json index 7bd1857c..a20dbc8d 100644 --- a/src/RawRabbit.vNext/project.json +++ b/src/RawRabbit.vNext/project.json @@ -1,5 +1,5 @@ { - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman", "enrique-avalon" ], "description": "Take advantage of vNext for your RawRabbit setup. Extension for IServiceCollection, IConfiguration support and much more!", "packOptions": { diff --git a/src/RawRabbit/Channel/ChannelFactory.cs b/src/RawRabbit/Channel/ChannelFactory.cs index ba951b19..9dddec5f 100644 --- a/src/RawRabbit/Channel/ChannelFactory.cs +++ b/src/RawRabbit/Channel/ChannelFactory.cs @@ -8,6 +8,7 @@ using RabbitMQ.Client.Exceptions; using RawRabbit.Channel.Abstraction; using RawRabbit.Configuration; +using RawRabbit.Exceptions; using RawRabbit.Logging; namespace RawRabbit.Channel @@ -15,7 +16,7 @@ namespace RawRabbit.Channel public class ChannelFactory : IChannelFactory { private readonly ConcurrentQueue> _requestQueue; - private readonly ILogger _logger = LogManager.GetLogger(); + private readonly ILogger _logger = LogManager.GetLogger(); internal readonly ChannelFactoryConfiguration _channelConfig; private readonly IConnectionFactory _connectionFactory; private readonly RawRabbitConfiguration _config; @@ -28,23 +29,46 @@ public class ChannelFactory : IChannelFactory private bool _processingRequests; public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfiguration config, ChannelFactoryConfiguration channelConfig) + { + _connectionFactory = connectionFactory; + _config = config; + _channelConfig = channelConfig; + _requestQueue = new ConcurrentQueue>(); + _channels = new LinkedList(); + + ConnectToBroker(); + Initialize(); + } + + protected virtual void ConnectToBroker() { try { - _connection = connectionFactory.CreateConnection(config.Hostnames); + _connection = _connectionFactory.CreateConnection(_config.Hostnames); + SetupConnectionRecovery(_connection); } catch (BrokerUnreachableException e) { _logger.LogError("Unable to connect to broker", e); throw e.InnerException; } - _connectionFactory = connectionFactory; - _config = config; - _channelConfig = channelConfig; - _requestQueue = new ConcurrentQueue>(); - _channels = new LinkedList(); + } - Initialize(); + protected virtual void SetupConnectionRecovery(IConnection connection = null) + { + connection = connection ?? _connection; + var recoverable = connection as IRecoverable; + if (recoverable == null) + { + _logger.LogInformation("Connection is not Recoverable. Failed connection will cause unhandled exception to be thrown."); + return; + } + _logger.LogDebug("Setting up Connection Recovery"); + recoverable.Recovery += (sender, args) => + { + _logger.LogInformation($"Connection has been recovered. Starting channel processing."); + EnsureRequestsAreHandled(); + }; } internal virtual void Initialize() @@ -132,7 +156,18 @@ public Task GetChannelAsync() { var tcs = new TaskCompletionSource(); _requestQueue.Enqueue(tcs); - EnsureRequestsAreHandled(); + if (_connection.IsOpen) + { + EnsureRequestsAreHandled(); + } + else + { + var recoverable = _connection as IRecoverable; + if (recoverable == null) + { + throw new ChannelAvailabilityException("Unable to retrieve chanel. Connection to broker is closed and not recoverable."); + } + } return tcs.Task; } @@ -203,7 +238,8 @@ private void EnsureRequestsAreHandled() var isRecoverable = _channels.Any(c => c is IRecoverable); if (!isRecoverable) { - throw new Exception("Unable to retreive channel. All existing channels are closed and none of them are recoverable."); + _processingRequests = false; + throw new ChannelAvailabilityException("Unable to retreive channel. All existing channels are closed and none of them are recoverable."); } _logger.LogInformation("Unable to find an open channel. Requeue TaskCompletionSource for future process and abort execution."); diff --git a/src/RawRabbit/Common/BaseBusClient.cs b/src/RawRabbit/Common/BaseBusClient.cs index 8f98c8d7..1b9e33b3 100644 --- a/src/RawRabbit/Common/BaseBusClient.cs +++ b/src/RawRabbit/Common/BaseBusClient.cs @@ -44,7 +44,7 @@ public ISubscription SubscribeAsync(Func subscribeM public Task PublishAsync(T message = default(T), Guid globalMessageId = new Guid(), Action configuration = null) { var config = _configEval.GetConfiguration(configuration); - _logger.LogDebug($"Publishing message '{typeof(T).Name}' on exchange '{config.Exchange.ExchangeName}' with routing key {config.RoutingKey}."); + _logger.LogDebug($"Initiating publish for message '{typeof(T).Name}' on exchange '{config.Exchange.ExchangeName}' with routing key {config.RoutingKey}."); return _publisher.PublishAsync(message, globalMessageId, config); } diff --git a/src/RawRabbit/Common/TopologyProvider.cs b/src/RawRabbit/Common/TopologyProvider.cs index 0f7914af..1284085c 100644 --- a/src/RawRabbit/Common/TopologyProvider.cs +++ b/src/RawRabbit/Common/TopologyProvider.cs @@ -133,7 +133,7 @@ public bool IsInitialized(ExchangeConfiguration exchange) public bool IsInitialized(QueueConfiguration queue) { - return queue.IsDirectReplyTo() || _initQueues.Contains(queue.FullQueueName); + return queue.IsDirectReplyTo() || queue.AssumeInitialized || _initQueues.Contains(queue.FullQueueName); } private void BindQueueToExchange(ScheduledBindQueueTask bind) diff --git a/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs b/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs index fa25d693..51097e65 100644 --- a/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; namespace RawRabbit.Configuration.Publish @@ -9,5 +10,6 @@ public interface IPublishConfigurationBuilder IPublishConfigurationBuilder WithExchange(Action exchange); IPublishConfigurationBuilder WithRoutingKey(string routingKey); IPublishConfigurationBuilder WithProperties(Action properties); + IPublishConfigurationBuilder WithMandatoryDelivery(EventHandler basicReturn); } } \ No newline at end of file diff --git a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs index fdc26a27..520e4a83 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; namespace RawRabbit.Configuration.Publish @@ -9,5 +10,6 @@ public class PublishConfiguration public ExchangeConfiguration Exchange { get; set; } public string RoutingKey { get; set; } public Action PropertyModifier { get; set; } + public EventHandler BasicReturn { get; set; } } } diff --git a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs index 46361698..e700fff1 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; using RawRabbit.Configuration.Request; @@ -11,12 +12,14 @@ public class PublishConfigurationBuilder : IPublishConfigurationBuilder private string _routingKey; private Action _properties; private const string _oneOrMoreWords = "#"; + private EventHandler _basicReturn; public PublishConfiguration Configuration => new PublishConfiguration { Exchange = _exchange.Configuration, RoutingKey = _routingKey, - PropertyModifier = _properties ?? (b => {}) + PropertyModifier = _properties ?? (b => {}), + BasicReturn = _basicReturn }; public PublishConfigurationBuilder(ExchangeConfiguration defaultExchange = null, string routingKey =null) @@ -48,5 +51,11 @@ public IPublishConfigurationBuilder WithProperties(Action prop _properties = properties; return this; } + + public IPublishConfigurationBuilder WithMandatoryDelivery(EventHandler basicReturn) + { + _basicReturn = basicReturn; + return this; + } } } \ No newline at end of file diff --git a/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs b/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs index df3b75ce..0fee69ae 100644 --- a/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs @@ -7,6 +7,6 @@ public interface IQueueConfigurationBuilder IQueueConfigurationBuilder WithDurability(bool durable = true); IQueueConfigurationBuilder WithExclusivity(bool exclusive = true); IQueueConfigurationBuilder WithArgument(string key, object value); - - } + IQueueConfigurationBuilder AssumeInitialized(bool asumption = true); + } } diff --git a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs index 08a2a98e..0adeafbe 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs @@ -24,6 +24,7 @@ public string FullQueueName public bool Exclusive { get; set; } public bool AutoDelete { get; set; } public Dictionary Arguments { get; set; } + public bool AssumeInitialized { get; set; } public QueueConfiguration() { diff --git a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs index 9b7aa026..426673f6 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs @@ -2,7 +2,7 @@ { public class QueueConfigurationBuilder : IQueueConfigurationBuilder { - public QueueConfiguration Configuration { get;} + public QueueConfiguration Configuration { get; } public QueueConfigurationBuilder(QueueConfiguration initialQueue = null) { @@ -44,5 +44,11 @@ public IQueueConfigurationBuilder WithArgument(string key, object value) Configuration.Arguments.Add(key, value); return this; } + + public IQueueConfigurationBuilder AssumeInitialized(bool asumption = true) + { + Configuration.AssumeInitialized = asumption; + return this; + } } } diff --git a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs index 94383bad..c6400610 100644 --- a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs +++ b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs @@ -71,7 +71,7 @@ public class RawRabbitConfiguration /// public SslOption Ssl { get; set; } - public string VirtualHost { get; set; } + public string VirtualHost { get; set; } public string Username { get; set; } public string Password { get; set; } public int Port { get; set; } @@ -103,6 +103,11 @@ public RawRabbitConfiguration() AutoDelete = false, Durable = true }; + VirtualHost = "/"; + Username = "guest"; + Password = "guest"; + Port = 5672; + Hostnames = new List {"localhost"}; } public static RawRabbitConfiguration Local => new RawRabbitConfiguration diff --git a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs index 6508950c..5c5cef6e 100644 --- a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs +++ b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs @@ -39,7 +39,7 @@ public virtual Task OnResponseHandlerExceptionAsync(IRawConsumer rawConsumer, IC { _logger.LogError($"An unhandled exception was thrown in the response handler for message '{args.BasicProperties.MessageId}'.", exception); var innerException = UnwrapInnerException(exception); - var exceptionInfo = new MessageHandlerExceptionInformation + var exceptionInfo = new ExceptionInformation { Message = $"An unhandled exception was thrown when consuming a message\n MessageId: {args.BasicProperties.MessageId}\n Queue: '{cfg.Queue.FullQueueName}'\n Exchange: '{cfg.Exchange.ExchangeName}'\nSee inner exception for more details.", ExceptionType = innerException.GetType().FullName, @@ -50,7 +50,7 @@ public virtual Task OnResponseHandlerExceptionAsync(IRawConsumer rawConsumer, IC rawConsumer.Model.BasicPublish( exchange: string.Empty, routingKey: args.BasicProperties?.ReplyTo ?? string.Empty, - basicProperties: _propertiesProvider.GetProperties(p => + basicProperties: _propertiesProvider.GetProperties(p => { p.CorrelationId = args.BasicProperties?.CorrelationId ?? string.Empty; p.Headers.Add(PropertyHeaders.ExceptionHeader, _messageExceptionName); @@ -82,7 +82,7 @@ public virtual Task OnResponseRecievedAsync(BasicDeliverEventArgs args, TaskComp if (containsException) { _logger.LogInformation($"Message '{args.BasicProperties.MessageId}' withh CorrelationId '{args.BasicProperties.CorrelationId}' contains exception. Deserialize and re-throw."); - var exceptionInfo = _serializer.Deserialize(args.Body); + var exceptionInfo = _serializer.Deserialize(args.Body); var exception = new MessageHandlerException(exceptionInfo.Message) { InnerExceptionType = exceptionInfo.ExceptionType, @@ -133,9 +133,16 @@ public virtual async Task OnSubscriberExceptionAsync(IRawConsumer consumer, Subs await _topologyProvider.DeclareExchangeAsync(_errorExchangeCfg); var channel = await _channelFactory.GetChannelAsync(); var msg = _serializer.Deserialize(args); + var actualException = UnwrapInnerException(exception); var errorMsg = new HandlerExceptionMessage { - Exception = exception, + Exception = new ExceptionInformation + { + ExceptionType = actualException.GetType().FullName, + InnerMessage = actualException.InnerException?.Message, + Message = actualException.Message, + StackTrace = actualException.StackTrace + }, Time = DateTime.Now, Host = Environment.MachineName, Message = msg, diff --git a/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs b/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs index 65ffc291..bb0fefc3 100644 --- a/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs +++ b/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs @@ -1,4 +1,5 @@ using System; +using RawRabbit.Exceptions; namespace RawRabbit.ErrorHandling { @@ -7,6 +8,6 @@ public class HandlerExceptionMessage public string Host { get; set; } public DateTime Time { get; set; } public object Message { get; set; } - public Exception Exception { get; set; } + public ExceptionInformation Exception { get; set; } } } diff --git a/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs b/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs new file mode 100644 index 00000000..e48dc03e --- /dev/null +++ b/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs @@ -0,0 +1,10 @@ +using System; + +namespace RawRabbit.Exceptions +{ + public class ChannelAvailabilityException : Exception + { + public ChannelAvailabilityException(string message) : base(message) + { } + } +} diff --git a/src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs b/src/RawRabbit/Exceptions/ExceptionInformation.cs similarity index 86% rename from src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs rename to src/RawRabbit/Exceptions/ExceptionInformation.cs index 31409f6c..6eb66f9b 100644 --- a/src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs +++ b/src/RawRabbit/Exceptions/ExceptionInformation.cs @@ -3,7 +3,7 @@ /// /// Holds information about exception thrown in a remote message handler. /// - public class MessageHandlerExceptionInformation + public class ExceptionInformation { public string Message { get; set; } public string ExceptionType { get; set; } diff --git a/src/RawRabbit/Operations/Publisher.cs b/src/RawRabbit/Operations/Publisher.cs index b5cee8d9..5e2601fb 100644 --- a/src/RawRabbit/Operations/Publisher.cs +++ b/src/RawRabbit/Operations/Publisher.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using RabbitMQ.Client; using RawRabbit.Channel.Abstraction; using RawRabbit.Common; using RawRabbit.Configuration; @@ -57,6 +56,9 @@ public Task PublishAsync(TMessage message, Guid globalMessageId, Publi { throw t.Exception; } + + channelTask.Result.BasicReturn += config.BasicReturn; + lock (_publishLock) { var ackTask = _acknowledger.GetAckTask(channelTask.Result); @@ -64,9 +66,13 @@ public Task PublishAsync(TMessage message, Guid globalMessageId, Publi exchange: config.Exchange.ExchangeName, routingKey: _config.RouteWithGlobalId ? $"{config.RoutingKey}.{globalMessageId}" : config.RoutingKey, basicProperties: props, - body: _serializer.Serialize(message) - ); - return ackTask; + body: _serializer.Serialize(message), + mandatory: (config.BasicReturn != null) + ); + return ackTask + .ContinueWith(a => { + channelTask.Result.BasicReturn -= config.BasicReturn; + }); } }) .Unwrap(); diff --git a/src/RawRabbit/project.json b/src/RawRabbit/project.json index 12290480..75b2737f 100644 --- a/src/RawRabbit/project.json +++ b/src/RawRabbit/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman", "enrique-avalon" ], "description": "A modern framework for communication over RabbitMq.", "packOptions": { diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs index 12449b9e..3297b63d 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs @@ -10,12 +10,12 @@ using RawRabbit.Configuration; using RawRabbit.Exceptions; using RawRabbit.IntegrationTests.TestMessages; -using RawRabbit.vNext; using Xunit; using ExchangeType = RawRabbit.Configuration.Exchange.ExchangeType; namespace RawRabbit.IntegrationTests.SimpleUse { + public class PublishAndSubscribeTests : IntegrationTestBase { @@ -26,7 +26,7 @@ public async Task Should_Be_Able_To_Subscribe_Without_Any_Additional_Config() using (var publisher = TestClientFactory.CreateNormal()) using (var subscriber = TestClientFactory.CreateNormal()) { - var message = new BasicMessage { Prop = "Hello, world!" }; + var message = new BasicMessage {Prop = "Hello, world!"}; var recievedTcs = new TaskCompletionSource(); subscriber.SubscribeAsync((msg, info) => @@ -251,8 +251,10 @@ await subscriber.PublishAsync(new BasicMessage .WithPrefetchCount(1)); /* Test */ - await publisher.PublishAsync(new BasicMessage { Prop = "I will be delivered" }); - await publisher.PublishAsync(new BasicMessage { Prop = "Someone will pass me in the queue" }, configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); + await publisher.PublishAsync(new BasicMessage {Prop = "I will be delivered"}); + await + publisher.PublishAsync(new BasicMessage {Prop = "Someone will pass me in the queue"}, + configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); queueBuilt.SetResult(true); await priorityTcs.Task; @@ -268,8 +270,8 @@ public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() using (var publisher = TestClientFactory.CreateNormal()) using (var subscriber = TestClientFactory.CreateNormal()) { - var firstMessage = new BasicMessage { Prop = "Value" }; - var secondMessage = new BasicMessage { Prop = "AnotherValue" }; + var firstMessage = new BasicMessage {Prop = "Value"}; + var secondMessage = new BasicMessage {Prop = "AnotherValue"}; var firstRecievedTcs = new TaskCompletionSource(); var secondRecievedTcs = new TaskCompletionSource(); var recievedCount = 0; @@ -309,7 +311,9 @@ public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() { var conventions = new NamingConventions(); - using (var client = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) + using ( + var client = + TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) { /* Setup */ var tcs = new TaskCompletionSource(); @@ -319,7 +323,7 @@ public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() return Task.FromResult(true); }); var uniqueValue = Guid.NewGuid().ToString(); - var jsonMsg = JsonConvert.SerializeObject(new BasicMessage { Prop = uniqueValue }); + var jsonMsg = JsonConvert.SerializeObject(new BasicMessage {Prop = uniqueValue}); /* Test */ TestChannel.BasicPublish( @@ -349,7 +353,7 @@ public async void Should_Be_Able_To_Publish_Dynamic_Objects() }); /* Test */ - client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }); + client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}); await tcs.Task; /* Assert */ @@ -360,7 +364,7 @@ public async void Should_Be_Able_To_Publish_Dynamic_Objects() [Fact] public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() { - using(var firstClient = TestClientFactory.CreateNormal()) + using (var firstClient = TestClientFactory.CreateNormal()) using (var secondClient = TestClientFactory.CreateNormal()) { /* Setup */ @@ -389,6 +393,103 @@ public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() Assert.True(true); } } + + [Fact] + public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() + { + /* Setup */ + using (var client = TestClientFactory.CreateNormal()) + { + var tcs = new TaskCompletionSource(); + /* Test */ + await client.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg + .WithMandatoryDelivery((obj, evt) => + { + tcs.SetResult(true); + })); + + /* Assert */ + Assert.True(tcs.Task.Result); + } + } + + [Fact] + public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string exchangeName = "initialized.exchange"; + const string queueName = "initialized.queue"; + + try + { + TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); + TestChannel.QueueDeclare(queueName, true, false, false); + + var tcs = new TaskCompletionSource(); + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + .WithSubscriberId(string.Empty) + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); + + /* Test */ + await client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }, + configuration: cfg => cfg + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + finally + { + /* Teardown */ + TestChannel.QueueDelete(queueName); + TestChannel.ExchangeDelete(exchangeName); + } + } + } + + [Fact] + public void Should_Throw_When_Subscribed_Not_Initialized_Queue() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string queueName = "not.initialized.queue"; + var tcs = new TaskCompletionSource(); + + + /* Test */ + + Assert.Throws(() => + + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + )); + + Assert.False(tcs.Task.IsCompleted); + } + } } } -