From 4b6e57e351357f66214c265696b4cf2bf20592f7 Mon Sep 17 00:00:00 2001 From: ritasker Date: Wed, 9 Nov 2016 21:06:51 +0000 Subject: [PATCH 01/21] (#129) Expose Mandatory Option For Publish The mandatory option has been exposed to RawRabbit by adding a new property to the PublishConfiguration and a new method, WithMandatoryDelivery, on PublishConfigurationBuilder. This method also allows the user to pass a function to the BasicReturn event handler. Publisher has been updated to use the madatory flag in the BasicPublish method and the BasicReturn event. The Rider IDE user files have also been ignored. --- .gitignore | 3 +++ sample/RawRabbit.Messages.Sample/project.json | 2 +- .../Publish/IPublishConfigurationBuilder.cs | 2 ++ .../Publish/PublishConfiguration.cs | 3 +++ .../Publish/PublishConfigurationBuilder.cs | 14 ++++++++++++- src/RawRabbit/Operations/Publisher.cs | 9 +++++--- .../SimpleUse/PublishAndSubscribeTests.cs | 21 ++++++++++++++++++- 7 files changed, 48 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 28100584..c962a5d7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,9 @@ docs/*build*/ #VS Code .vscode/** +#Rider +.idea/** + # User-specific files (MonoDevelop/Xamarin Studio) *.userprefs **ncrunch** diff --git a/sample/RawRabbit.Messages.Sample/project.json b/sample/RawRabbit.Messages.Sample/project.json index 1d6031b6..f06c651b 100644 --- a/sample/RawRabbit.Messages.Sample/project.json +++ b/sample/RawRabbit.Messages.Sample/project.json @@ -7,6 +7,6 @@ "frameworks": { "netstandard1.5": {}, - "net451": {}, + "net451": {} } } diff --git a/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs b/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs index fa25d693..51097e65 100644 --- a/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Publish/IPublishConfigurationBuilder.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; namespace RawRabbit.Configuration.Publish @@ -9,5 +10,6 @@ public interface IPublishConfigurationBuilder IPublishConfigurationBuilder WithExchange(Action exchange); IPublishConfigurationBuilder WithRoutingKey(string routingKey); IPublishConfigurationBuilder WithProperties(Action properties); + IPublishConfigurationBuilder WithMandatoryDelivery(EventHandler basicReturn); } } \ No newline at end of file diff --git a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs index fdc26a27..b22f4946 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; namespace RawRabbit.Configuration.Publish @@ -9,5 +10,7 @@ public class PublishConfiguration public ExchangeConfiguration Exchange { get; set; } public string RoutingKey { get; set; } public Action PropertyModifier { get; set; } + public bool Mandatory { get; set; } + public EventHandler BasicReturn { get; set; } } } diff --git a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs index 46361698..b39e179e 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs @@ -1,5 +1,6 @@ using System; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RawRabbit.Configuration.Exchange; using RawRabbit.Configuration.Request; @@ -11,12 +12,16 @@ public class PublishConfigurationBuilder : IPublishConfigurationBuilder private string _routingKey; private Action _properties; private const string _oneOrMoreWords = "#"; + private bool _mandatory; + private EventHandler _basicReturn; public PublishConfiguration Configuration => new PublishConfiguration { Exchange = _exchange.Configuration, RoutingKey = _routingKey, - PropertyModifier = _properties ?? (b => {}) + PropertyModifier = _properties ?? (b => {}), + Mandatory = _mandatory, + BasicReturn = _basicReturn }; public PublishConfigurationBuilder(ExchangeConfiguration defaultExchange = null, string routingKey =null) @@ -48,5 +53,12 @@ public IPublishConfigurationBuilder WithProperties(Action prop _properties = properties; return this; } + + public IPublishConfigurationBuilder WithMandatoryDelivery(EventHandler basicReturn) + { + _mandatory = true; + _basicReturn = basicReturn; + return this; + } } } \ No newline at end of file diff --git a/src/RawRabbit/Operations/Publisher.cs b/src/RawRabbit/Operations/Publisher.cs index b5cee8d9..2e8d0bdb 100644 --- a/src/RawRabbit/Operations/Publisher.cs +++ b/src/RawRabbit/Operations/Publisher.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using RabbitMQ.Client; using RawRabbit.Channel.Abstraction; using RawRabbit.Common; using RawRabbit.Configuration; @@ -57,6 +56,9 @@ public Task PublishAsync(TMessage message, Guid globalMessageId, Publi { throw t.Exception; } + + channelTask.Result.BasicReturn += config.BasicReturn; + lock (_publishLock) { var ackTask = _acknowledger.GetAckTask(channelTask.Result); @@ -64,8 +66,9 @@ public Task PublishAsync(TMessage message, Guid globalMessageId, Publi exchange: config.Exchange.ExchangeName, routingKey: _config.RouteWithGlobalId ? $"{config.RoutingKey}.{globalMessageId}" : config.RoutingKey, basicProperties: props, - body: _serializer.Serialize(message) - ); + body: _serializer.Serialize(message), + mandatory: config.Mandatory + ); return ackTask; } }) diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs index 12449b9e..5b62c338 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs @@ -10,7 +10,6 @@ using RawRabbit.Configuration; using RawRabbit.Exceptions; using RawRabbit.IntegrationTests.TestMessages; -using RawRabbit.vNext; using Xunit; using ExchangeType = RawRabbit.Configuration.Exchange.ExchangeType; @@ -389,6 +388,26 @@ public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() Assert.True(true); } } + + [Fact] + public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() + { + /* Setup */ + using (var client = TestClientFactory.CreateNormal()) + { + var tcs = new TaskCompletionSource(); + /* Test */ + await client.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg + .WithMandatoryDelivery((obj, evt) => + { + tcs.SetResult(true); + })); + + /* Assert */ + Assert.True(tcs.Task.Result); + } + } } } From 9c49460ef56f161675dbf22f3ce9b0d0e21e970e Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Sun, 13 Nov 2016 11:32:55 +0100 Subject: [PATCH 02/21] Correct logging class for ChannelFactory --- src/RawRabbit/Channel/ChannelFactory.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/RawRabbit/Channel/ChannelFactory.cs b/src/RawRabbit/Channel/ChannelFactory.cs index ba951b19..8c61f373 100644 --- a/src/RawRabbit/Channel/ChannelFactory.cs +++ b/src/RawRabbit/Channel/ChannelFactory.cs @@ -15,7 +15,7 @@ namespace RawRabbit.Channel public class ChannelFactory : IChannelFactory { private readonly ConcurrentQueue> _requestQueue; - private readonly ILogger _logger = LogManager.GetLogger(); + private readonly ILogger _logger = LogManager.GetLogger(); internal readonly ChannelFactoryConfiguration _channelConfig; private readonly IConnectionFactory _connectionFactory; private readonly RawRabbitConfiguration _config; From fca27b2ec480093451be8f22a712b4cc33ca508e Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Sun, 13 Nov 2016 11:35:47 +0100 Subject: [PATCH 03/21] (#132) Move connect to virtual method So that it can be overriden, error handled etc by custom implementations --- src/RawRabbit/Channel/ChannelFactory.cs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/RawRabbit/Channel/ChannelFactory.cs b/src/RawRabbit/Channel/ChannelFactory.cs index 8c61f373..61d489fb 100644 --- a/src/RawRabbit/Channel/ChannelFactory.cs +++ b/src/RawRabbit/Channel/ChannelFactory.cs @@ -28,23 +28,28 @@ public class ChannelFactory : IChannelFactory private bool _processingRequests; public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfiguration config, ChannelFactoryConfiguration channelConfig) + { + ConnectToBroker(); + _connectionFactory = connectionFactory; + _config = config; + _channelConfig = channelConfig; + _requestQueue = new ConcurrentQueue>(); + _channels = new LinkedList(); + + Initialize(); + } + + protected virtual void ConnectToBroker() { try { - _connection = connectionFactory.CreateConnection(config.Hostnames); + _connection = _connectionFactory.CreateConnection(_config.Hostnames); } catch (BrokerUnreachableException e) { _logger.LogError("Unable to connect to broker", e); throw e.InnerException; } - _connectionFactory = connectionFactory; - _config = config; - _channelConfig = channelConfig; - _requestQueue = new ConcurrentQueue>(); - _channels = new LinkedList(); - - Initialize(); } internal virtual void Initialize() From fb56cb4ba6eda67e4b60a7d99194cf5c661ffebf Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Sun, 13 Nov 2016 14:51:36 +0100 Subject: [PATCH 04/21] (#143) Throw ChannelAvailabilityException --- src/RawRabbit/Channel/ChannelFactory.cs | 3 ++- .../Exceptions/ChannelAvailabilityException.cs | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 src/RawRabbit/Exceptions/ChannelAvailabilityException.cs diff --git a/src/RawRabbit/Channel/ChannelFactory.cs b/src/RawRabbit/Channel/ChannelFactory.cs index 61d489fb..c45aa7c0 100644 --- a/src/RawRabbit/Channel/ChannelFactory.cs +++ b/src/RawRabbit/Channel/ChannelFactory.cs @@ -8,6 +8,7 @@ using RabbitMQ.Client.Exceptions; using RawRabbit.Channel.Abstraction; using RawRabbit.Configuration; +using RawRabbit.Exceptions; using RawRabbit.Logging; namespace RawRabbit.Channel @@ -208,7 +209,7 @@ private void EnsureRequestsAreHandled() var isRecoverable = _channels.Any(c => c is IRecoverable); if (!isRecoverable) { - throw new Exception("Unable to retreive channel. All existing channels are closed and none of them are recoverable."); + throw new ChannelAvailabilityException("Unable to retreive channel. All existing channels are closed and none of them are recoverable."); } _logger.LogInformation("Unable to find an open channel. Requeue TaskCompletionSource for future process and abort execution."); diff --git a/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs b/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs new file mode 100644 index 00000000..e48dc03e --- /dev/null +++ b/src/RawRabbit/Exceptions/ChannelAvailabilityException.cs @@ -0,0 +1,10 @@ +using System; + +namespace RawRabbit.Exceptions +{ + public class ChannelAvailabilityException : Exception + { + public ChannelAvailabilityException(string message) : base(message) + { } + } +} From 1739c4dfabcf31e42b73bd09becfd3967fe2c5ff Mon Sep 17 00:00:00 2001 From: richard Date: Mon, 14 Nov 2016 20:39:32 +0000 Subject: [PATCH 05/21] Remove Mandatory Proerty And Basic Return Event Handler - Removed the Mandatory property from PublishConfiguration and PublishConfigurationBuilder classes. - Removed the BasicReturn event handler once it has executed. --- .../Configuration/Publish/PublishConfiguration.cs | 1 - .../Configuration/Publish/PublishConfigurationBuilder.cs | 3 --- src/RawRabbit/Operations/Publisher.cs | 7 +++++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs index b22f4946..520e4a83 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfiguration.cs @@ -10,7 +10,6 @@ public class PublishConfiguration public ExchangeConfiguration Exchange { get; set; } public string RoutingKey { get; set; } public Action PropertyModifier { get; set; } - public bool Mandatory { get; set; } public EventHandler BasicReturn { get; set; } } } diff --git a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs index b39e179e..e700fff1 100644 --- a/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Publish/PublishConfigurationBuilder.cs @@ -12,7 +12,6 @@ public class PublishConfigurationBuilder : IPublishConfigurationBuilder private string _routingKey; private Action _properties; private const string _oneOrMoreWords = "#"; - private bool _mandatory; private EventHandler _basicReturn; public PublishConfiguration Configuration => new PublishConfiguration @@ -20,7 +19,6 @@ public class PublishConfigurationBuilder : IPublishConfigurationBuilder Exchange = _exchange.Configuration, RoutingKey = _routingKey, PropertyModifier = _properties ?? (b => {}), - Mandatory = _mandatory, BasicReturn = _basicReturn }; @@ -56,7 +54,6 @@ public IPublishConfigurationBuilder WithProperties(Action prop public IPublishConfigurationBuilder WithMandatoryDelivery(EventHandler basicReturn) { - _mandatory = true; _basicReturn = basicReturn; return this; } diff --git a/src/RawRabbit/Operations/Publisher.cs b/src/RawRabbit/Operations/Publisher.cs index 2e8d0bdb..5e2601fb 100644 --- a/src/RawRabbit/Operations/Publisher.cs +++ b/src/RawRabbit/Operations/Publisher.cs @@ -67,9 +67,12 @@ public Task PublishAsync(TMessage message, Guid globalMessageId, Publi routingKey: _config.RouteWithGlobalId ? $"{config.RoutingKey}.{globalMessageId}" : config.RoutingKey, basicProperties: props, body: _serializer.Serialize(message), - mandatory: config.Mandatory + mandatory: (config.BasicReturn != null) ); - return ackTask; + return ackTask + .ContinueWith(a => { + channelTask.Result.BasicReturn -= config.BasicReturn; + }); } }) .Unwrap(); From f87868c037a5080ce76b42c525e370a161284bcb Mon Sep 17 00:00:00 2001 From: Cemre Mengu Date: Sun, 20 Nov 2016 12:19:44 +0300 Subject: [PATCH 06/21] Update README.md minor grammar fix --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 61054691..de3c99cc 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ client.SubscribeAsync(async (msg, context) => await client.PublishAsync(new BasicMessage { Prop = "Hello, world!"}); ``` -### Request/Respond -`RawRabbits` request/respond (`RPC`) implementation uses the [direct reply-to feature](https://www.rabbitmq.com/direct-reply-to.html) for better performance and lower resource allocation. +### Request/Response +`RawRabbits` request/response (`RPC`) implementation uses the [direct reply-to feature](https://www.rabbitmq.com/direct-reply-to.html) for better performance and lower resource allocation. ```csharp var client = BusClientFactory.CreateDefault(); client.RespondAsync(async (request, context) => From 809ad60a526c2570043bcc982829e756b0198835 Mon Sep 17 00:00:00 2001 From: Cemre Mengu Date: Sun, 20 Nov 2016 12:23:28 +0300 Subject: [PATCH 07/21] Update Getting-started.md minor grammer fix --- docs/Getting-started.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/Getting-started.md b/docs/Getting-started.md index a7e2a281..0fe4c684 100644 --- a/docs/Getting-started.md +++ b/docs/Getting-started.md @@ -12,8 +12,8 @@ Install the latest version of [`RawRabbit`](https://www.nuget.org/packages/RawRa ``` The `vNext` package contains the convenience class `BusClientFactory` that can be used to create a default instance of the `RawRabbit` client. It makes life easier, but is not necesary. -## Creating instanse -Depending on the scenario, there are a few different ways to instansiate the `RawRabbit` client. The methods described below all have optional arguments for registering specific subdependeices. +## Creating instance +Depending on the scenario, there are a few different ways to instantiate the `RawRabbit` client. The methods described below all have optional arguments for registering specific subdependeices. ### vNext Application wire-up If the application is bootstrapped from a `vNext` application, the dependecies and client can be registed by using the `AddRawRabbit` extension for `IServiceCollection` @@ -52,7 +52,7 @@ kernel.RegisterRawRabbit("guest:guest@localhost:5672/"); ``` ## Broker connection -As soon as the client is instansiated, it will try to connect to the broker. By default `RawRabbit` will try to connect to `localhost`. Configuration can be provided in different ways. +As soon as the client is instantiated, it will try to connect to the broker. By default `RawRabbit` will try to connect to `localhost`. Configuration can be provided in different ways. ### Configuration object The main configuration object for `RawRabbit` is `RawRabbitConfiguration`. @@ -109,4 +109,4 @@ var bulk = client.GetMessages(cfg => cfg .GetAll() .WithNoAck() )); -``` \ No newline at end of file +``` From c9a82f3a03f098cbb6ba7d0402a09cb9068078ca Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Sun, 20 Nov 2016 11:40:03 +0100 Subject: [PATCH 08/21] (#143) Setup Connection Recovery in ChannelFactory In case the connection to the broker is lost, the channel factory should return a task that is not processed. The when the recovery event is fired, the channel requests will be handled. --- src/RawRabbit/Channel/ChannelFactory.cs | 34 +++++++++++++++++++++++-- src/RawRabbit/Common/BaseBusClient.cs | 2 +- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/RawRabbit/Channel/ChannelFactory.cs b/src/RawRabbit/Channel/ChannelFactory.cs index c45aa7c0..9dddec5f 100644 --- a/src/RawRabbit/Channel/ChannelFactory.cs +++ b/src/RawRabbit/Channel/ChannelFactory.cs @@ -30,13 +30,13 @@ public class ChannelFactory : IChannelFactory public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfiguration config, ChannelFactoryConfiguration channelConfig) { - ConnectToBroker(); _connectionFactory = connectionFactory; _config = config; _channelConfig = channelConfig; _requestQueue = new ConcurrentQueue>(); _channels = new LinkedList(); + ConnectToBroker(); Initialize(); } @@ -45,6 +45,7 @@ protected virtual void ConnectToBroker() try { _connection = _connectionFactory.CreateConnection(_config.Hostnames); + SetupConnectionRecovery(_connection); } catch (BrokerUnreachableException e) { @@ -53,6 +54,23 @@ protected virtual void ConnectToBroker() } } + protected virtual void SetupConnectionRecovery(IConnection connection = null) + { + connection = connection ?? _connection; + var recoverable = connection as IRecoverable; + if (recoverable == null) + { + _logger.LogInformation("Connection is not Recoverable. Failed connection will cause unhandled exception to be thrown."); + return; + } + _logger.LogDebug("Setting up Connection Recovery"); + recoverable.Recovery += (sender, args) => + { + _logger.LogInformation($"Connection has been recovered. Starting channel processing."); + EnsureRequestsAreHandled(); + }; + } + internal virtual void Initialize() { _logger.LogDebug($"Initiating {_channelConfig.InitialChannelCount} channels."); @@ -138,7 +156,18 @@ public Task GetChannelAsync() { var tcs = new TaskCompletionSource(); _requestQueue.Enqueue(tcs); - EnsureRequestsAreHandled(); + if (_connection.IsOpen) + { + EnsureRequestsAreHandled(); + } + else + { + var recoverable = _connection as IRecoverable; + if (recoverable == null) + { + throw new ChannelAvailabilityException("Unable to retrieve chanel. Connection to broker is closed and not recoverable."); + } + } return tcs.Task; } @@ -209,6 +238,7 @@ private void EnsureRequestsAreHandled() var isRecoverable = _channels.Any(c => c is IRecoverable); if (!isRecoverable) { + _processingRequests = false; throw new ChannelAvailabilityException("Unable to retreive channel. All existing channels are closed and none of them are recoverable."); } diff --git a/src/RawRabbit/Common/BaseBusClient.cs b/src/RawRabbit/Common/BaseBusClient.cs index 8f98c8d7..1b9e33b3 100644 --- a/src/RawRabbit/Common/BaseBusClient.cs +++ b/src/RawRabbit/Common/BaseBusClient.cs @@ -44,7 +44,7 @@ public ISubscription SubscribeAsync(Func subscribeM public Task PublishAsync(T message = default(T), Guid globalMessageId = new Guid(), Action configuration = null) { var config = _configEval.GetConfiguration(configuration); - _logger.LogDebug($"Publishing message '{typeof(T).Name}' on exchange '{config.Exchange.ExchangeName}' with routing key {config.RoutingKey}."); + _logger.LogDebug($"Initiating publish for message '{typeof(T).Name}' on exchange '{config.Exchange.ExchangeName}' with routing key {config.RoutingKey}."); return _publisher.PublishAsync(message, globalMessageId, config); } From 2cf0a3b88d4aa3d845e83da8708998d01f6d8fd5 Mon Sep 17 00:00:00 2001 From: Cemre Mengu Date: Mon, 21 Nov 2016 14:49:30 +0300 Subject: [PATCH 09/21] Update README.md Added slack auto-invite and status badge --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index de3c99cc..38042b77 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # RawRabbit [![Build Status](https://img.shields.io/appveyor/ci/pardahlman/rawrabbit.svg?style=flat-square)](https://ci.appveyor.com/project/pardahlman/rawrabbit) [![Documentation Status](https://readthedocs.org/projects/rawrabbit/badge/?version=latest&style=flat-square)](http://rawrabbit.readthedocs.org/) [![NuGet](https://img.shields.io/nuget/v/RawRabbit.svg?style=flat-square)](https://www.nuget.org/packages/RawRabbit) [![GitHub release](https://img.shields.io/github/release/pardahlman/rawrabbit.svg?style=flat-square)](https://github.com/pardahlman/rawrabbit/releases/latest) +[![Slack Status](https://rawrabbit.herokuapp.com/badge.svg)](https://rawrabbit.herokuapp.com) ## Quick introduction `RawRabbit` is a modern .NET client for communication over [RabbitMq](http://rabbitmq.com/). It is written for [`.NET Core`](http://dot.net) and uses Microsoft’s new frameworks for [logging](https://github.com/aspnet/Logging), [configuration](https://github.com/aspnet/Configuration) and [dependecy injection](https://github.com/aspnet/DependencyInjection). Full documentation available at [`rawrabbit.readthedocs.org`](http://rawrabbit.readthedocs.org/). From dc25fe1be132ec7945dcd5f5f389e49e4518c236 Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Tue, 22 Nov 2016 09:34:27 +0300 Subject: [PATCH 10/21] Added vhost as default value --- src/RawRabbit/Configuration/RawRabbitConfiguration.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs index 94383bad..07c70065 100644 --- a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs +++ b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs @@ -71,7 +71,7 @@ public class RawRabbitConfiguration /// public SslOption Ssl { get; set; } - public string VirtualHost { get; set; } + public string VirtualHost { get; set; } public string Username { get; set; } public string Password { get; set; } public int Port { get; set; } @@ -103,6 +103,7 @@ public RawRabbitConfiguration() AutoDelete = false, Durable = true }; + VirtualHost = "/"; } public static RawRabbitConfiguration Local => new RawRabbitConfiguration From 1ccf7aa35e35c6e3542a862d2115d6815e0dff6e Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Tue, 22 Nov 2016 09:47:38 +0300 Subject: [PATCH 11/21] Added more default values --- src/RawRabbit/Configuration/RawRabbitConfiguration.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs index 07c70065..c6400610 100644 --- a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs +++ b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs @@ -104,6 +104,10 @@ public RawRabbitConfiguration() Durable = true }; VirtualHost = "/"; + Username = "guest"; + Password = "guest"; + Port = 5672; + Hostnames = new List {"localhost"}; } public static RawRabbitConfiguration Local => new RawRabbitConfiguration From b1291ab029c014c18fa56bc497061618666cf4c1 Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Tue, 22 Nov 2016 14:25:57 +0300 Subject: [PATCH 12/21] added editorconfig for uniform editing --- .editorconfig | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..e63e7786 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,31 @@ +# editorconfig.org + +# top-most EditorConfig file +root = true + +# Default settings: +# A newline ending every file +# Use 4 spaces as indentation +[*] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 4 + +# Xml project files +[*.{csproj,vcxproj,vcxproj.filters,proj,nativeproj,locproj}] +indent_size = 2 + +# Xml files +[*.{xml,stylecop,resx,ruleset}] +indent_size = 2 + +# Xml config files +[*.{props,targets,config,nuspec}] +indent_size = 2 + +# Shell scripts +[*.sh] +end_of_line = lf +[*.{cmd, bat}] +end_of_line = crlf From f6fceb5fde2e98810415b8f947dc42a80291ed38 Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Wed, 23 Nov 2016 21:34:05 +0300 Subject: [PATCH 13/21] Changed indentation to tabs --- .editorconfig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.editorconfig b/.editorconfig index e63e7786..9ab7418a 100644 --- a/.editorconfig +++ b/.editorconfig @@ -9,7 +9,7 @@ root = true [*] trim_trailing_whitespace = true insert_final_newline = true -indent_style = space +indent_style = tab indent_size = 4 # Xml project files From 10450689300a8c352fd8d840be6894b5c78b6fac Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Wed, 23 Nov 2016 19:11:45 +0300 Subject: [PATCH 14/21] Added queue assume initialized and tests --- src/RawRabbit/Common/TopologyProvider.cs | 2 +- .../Queue/IQueueConfigurationBuilder.cs | 4 +- .../Configuration/Queue/QueueConfiguration.cs | 3 +- .../Queue/QueueConfigurationBuilder.cs | 12 +- .../SimpleUse/PublishAndSubscribeTests.cs | 898 ++++++++++-------- 5 files changed, 519 insertions(+), 400 deletions(-) diff --git a/src/RawRabbit/Common/TopologyProvider.cs b/src/RawRabbit/Common/TopologyProvider.cs index 0f7914af..1284085c 100644 --- a/src/RawRabbit/Common/TopologyProvider.cs +++ b/src/RawRabbit/Common/TopologyProvider.cs @@ -133,7 +133,7 @@ public bool IsInitialized(ExchangeConfiguration exchange) public bool IsInitialized(QueueConfiguration queue) { - return queue.IsDirectReplyTo() || _initQueues.Contains(queue.FullQueueName); + return queue.IsDirectReplyTo() || queue.AssumeInitialized || _initQueues.Contains(queue.FullQueueName); } private void BindQueueToExchange(ScheduledBindQueueTask bind) diff --git a/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs b/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs index df3b75ce..0fee69ae 100644 --- a/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Queue/IQueueConfigurationBuilder.cs @@ -7,6 +7,6 @@ public interface IQueueConfigurationBuilder IQueueConfigurationBuilder WithDurability(bool durable = true); IQueueConfigurationBuilder WithExclusivity(bool exclusive = true); IQueueConfigurationBuilder WithArgument(string key, object value); - - } + IQueueConfigurationBuilder AssumeInitialized(bool asumption = true); + } } diff --git a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs index 08a2a98e..61a67646 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs @@ -8,7 +8,7 @@ public string FullQueueName { get { - var fullQueueName = string.IsNullOrEmpty(NameSuffix) + var fullQueueName = string.IsNullOrEmpty(NameSuffix) || AssumeInitialized ? QueueName : $"{QueueName}_{NameSuffix}"; @@ -24,6 +24,7 @@ public string FullQueueName public bool Exclusive { get; set; } public bool AutoDelete { get; set; } public Dictionary Arguments { get; set; } + public bool AssumeInitialized { get; set; } public QueueConfiguration() { diff --git a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs index 9b7aa026..477a1fe4 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs @@ -1,4 +1,6 @@ -namespace RawRabbit.Configuration.Queue +using System; + +namespace RawRabbit.Configuration.Queue { public class QueueConfigurationBuilder : IQueueConfigurationBuilder { @@ -44,5 +46,11 @@ public IQueueConfigurationBuilder WithArgument(string key, object value) Configuration.Arguments.Add(key, value); return this; } - } + + public IQueueConfigurationBuilder AssumeInitialized(bool asumption = true) + { + Configuration.AssumeInitialized = asumption; + return this; + } + } } diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs index 5b62c338..8d2c97b6 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs @@ -15,399 +15,509 @@ namespace RawRabbit.IntegrationTests.SimpleUse { - public class PublishAndSubscribeTests : IntegrationTestBase - { - - [Fact] - public async Task Should_Be_Able_To_Subscribe_Without_Any_Additional_Config() - { - /* Setup */ - using (var publisher = TestClientFactory.CreateNormal()) - using (var subscriber = TestClientFactory.CreateNormal()) - { - var message = new BasicMessage { Prop = "Hello, world!" }; - var recievedTcs = new TaskCompletionSource(); - - subscriber.SubscribeAsync((msg, info) => - { - if (msg.Prop == message.Prop) - { - recievedTcs.SetResult(msg); - } - return Task.FromResult(true); - }); - - /* Test */ - publisher.PublishAsync(message); - await recievedTcs.Task; - - /* Assert */ - Assert.Equal(expected: message.Prop, actual: recievedTcs.Task.Result.Prop); - } - } - - [Fact] - public async Task Should_Be_Able_To_Perform_Multiple_Pub_Subs() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - const int numberOfCalls = 100; - var recived = 0; - var recievedTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - Interlocked.Increment(ref recived); - if (numberOfCalls == recived) - { - recievedTcs.SetResult(true); - } - return Task.FromResult(true); - }); - - /* Test */ - var sw = Stopwatch.StartNew(); - for (int i = 0; i < numberOfCalls; i++) - { - publisher.PublishAsync(); - } - await recievedTcs.Task; - sw.Stop(); - - /* Assert */ - Assert.True(true, $"Completed {numberOfCalls} in {sw.ElapsedMilliseconds} ms."); - } - } - - [Fact] - public void Should_Be_Able_To_Perform_Subscribe_For_Multiple_Types() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var basicTcs = new TaskCompletionSource(); - var simpleTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - basicTcs.SetResult(message); - return Task.FromResult(true); - }); - subscriber.SubscribeAsync((message, context) => - { - simpleTcs.SetResult(message); - return Task.FromResult(true); - }); - - /* Test */ - publisher.PublishAsync(); - publisher.PublishAsync(); - Task.WaitAll(basicTcs.Task, simpleTcs.Task); - - /* Assert */ - Assert.True(true, "Successfully recieved messages."); - } - } - - [Fact] - public async Task Should_Throw_Publish_Confirm_Exception_If_Server_Doesnt_Respond_Within_Time_Limit() - { - /* Setup */ - var publisher = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(p => - { - var config = RawRabbitConfiguration.Local; - config.PublishConfirmTimeout = TimeSpan.FromTicks(1); - return config; - })); - using (publisher) - { - /* Test */ - /* Assert */ - try - { - await publisher.PublishAsync(); - } - catch (PublishConfirmException) - { - Assert.True(true); - } - } - } - - [Fact] - public void Should_Be_Able_To_Confirm_Multiple_Messages() - { - /* Setup */ - const int numberOfCalls = 100; - var confirmTasks = new Task[numberOfCalls]; - using (var publisher = TestClientFactory.CreateNormal()) - { - for (int i = 0; i < numberOfCalls; i++) - { - var confirmTask = publisher.PublishAsync(); - confirmTasks[i] = confirmTask; - } - Task.WaitAll(confirmTasks); - Task.Delay(500).Wait(); - - Assert.True(true, "Successfully confirmed all messages."); - } - } - - [Fact] - public void Should_Be_Able_To_Delivery_Message_To_Multiple_Subscribers_On_Same_Host() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var firstTcs = new TaskCompletionSource(); - var secondTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - firstTcs.SetResult(true); - return Task.FromResult(true); - }); - subscriber.SubscribeAsync((message, context) => - { - secondTcs.SetResult(true); - return Task.FromResult(true); - }); - - /* Test */ - var ackTask = publisher.PublishAsync(); - Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); - - /* Assert */ - Assert.True(true, "Published and subscribe sucessfull."); - } - } - - [Fact] - public void Should_Be_Able_To_Deliver_Messages_To_Unique_Subscribers() - { - /* Setup */ - using (var firstSubscriber = TestClientFactory.CreateNormal()) - using (var secondSubscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var firstTcs = new TaskCompletionSource(); - var secondTcs = new TaskCompletionSource(); - firstSubscriber.SubscribeAsync((message, context) => - { - firstTcs.SetResult(true); - return Task.FromResult(true); - }, cfg => cfg.WithSubscriberId("first_subscriber")); - secondSubscriber.SubscribeAsync((message, context) => - { - secondTcs.SetResult(true); - return Task.FromResult(true); - }, cfg => cfg.WithSubscriberId("second_subscriber")); - - /* Test */ - var ackTask = publisher.PublishAsync(); - Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); - - /* Assert */ - Assert.True(true, "Published and subscribe sucessfull."); - - } - } - - [Fact] - public async Task Should_Be_Able_To_Use_Priority() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var prioritySent = false; - var queueBuilt = new TaskCompletionSource(); - var priorityTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync(async (message, context) => - { - await queueBuilt.Task; - if (!prioritySent) - { - await subscriber.PublishAsync(new BasicMessage - { - Prop = "I am important!" - }, configuration: cfg => cfg.WithProperties(p => - { - p.Priority = 3; - })); - prioritySent = true; - } - else - { - priorityTcs.TrySetResult(message); - } - - }, cfg => cfg - .WithQueue(q => q.WithArgument(QueueArgument.MaxPriority, 3)) - .WithSubscriberId("priority") - .WithPrefetchCount(1)); - - /* Test */ - await publisher.PublishAsync(new BasicMessage { Prop = "I will be delivered" }); - await publisher.PublishAsync(new BasicMessage { Prop = "Someone will pass me in the queue" }, configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); - queueBuilt.SetResult(true); - await priorityTcs.Task; - - /* Asset */ - Assert.Equal(expected: "I am important!", actual: priorityTcs.Task.Result.Prop); - } - } - - [Fact] - public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() - { - /* Setup */ - using (var publisher = TestClientFactory.CreateNormal()) - using (var subscriber = TestClientFactory.CreateNormal()) - { - var firstMessage = new BasicMessage { Prop = "Value" }; - var secondMessage = new BasicMessage { Prop = "AnotherValue" }; - var firstRecievedTcs = new TaskCompletionSource(); - var secondRecievedTcs = new TaskCompletionSource(); - var recievedCount = 0; - - var subscription = subscriber.SubscribeAsync((message, context) => - { - recievedCount++; - if (!firstRecievedTcs.Task.IsCompleted) - { - firstRecievedTcs.SetResult(message); - } - return Task.FromResult(true); - }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); - - /* Test */ - publisher.PublishAsync(firstMessage); - await firstRecievedTcs.Task; - subscription.Dispose(); - var recievedAfterFirstPublish = recievedCount; - publisher.PublishAsync(secondMessage); - await Task.Delay(20); - publisher.SubscribeAsync((message, context) => - { - secondRecievedTcs.SetResult(message); - return Task.FromResult(true); - }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); - await secondRecievedTcs.Task; - TestChannel.QueueDelete(subscription.QueueName); - /* Assert */ - Assert.Equal(recievedAfterFirstPublish, recievedCount); - Assert.Equal(firstRecievedTcs.Task.Result.Prop, firstMessage.Prop); - Assert.Equal(secondRecievedTcs.Task.Result.Prop, secondMessage.Prop); - } - } - - [Fact] - public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() - { - var conventions = new NamingConventions(); - using (var client = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - var subscription = client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }); - var uniqueValue = Guid.NewGuid().ToString(); - var jsonMsg = JsonConvert.SerializeObject(new BasicMessage { Prop = uniqueValue }); - - /* Test */ - TestChannel.BasicPublish( - conventions.ExchangeNamingConvention(typeof(BasicMessage)), - conventions.QueueNamingConvention(typeof(BasicMessage)), - true, - null, - Encoding.UTF8.GetBytes(jsonMsg)); - await tcs.Task; - - /* Assert */ - Assert.Equal(uniqueValue, tcs.Task.Result.Prop); - } - } - - [Fact] - public async void Should_Be_Able_To_Publish_Dynamic_Objects() - { - using (var client = TestClientFactory.CreateNormal()) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }); - - /* Test */ - client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }); - await tcs.Task; - - /* Assert */ - Assert.True(tcs.Task.Result.Body.IsDynamic); - } - } - - [Fact] - public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() - { - using(var firstClient = TestClientFactory.CreateNormal()) - using (var secondClient = TestClientFactory.CreateNormal()) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - firstClient.SubscribeAsync((message, context) => - { - tcs.TrySetResult(true); - return Task.FromResult(true); - }); - - /* Test */ - try - { - await - secondClient.PublishAsync(new SimpleMessage(), - configuration: cfg => cfg.WithExchange(e => e.WithType(ExchangeType.Direct))); - } - catch (Exception) - { - await Task.Delay(50); - Assert.False(tcs.Task.IsCompleted); - } - secondClient.PublishAsync(new SimpleMessage()); - await tcs.Task; - /* Assert */ - Assert.True(true); - } - } - - [Fact] - public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() - { - /* Setup */ - using (var client = TestClientFactory.CreateNormal()) - { - var tcs = new TaskCompletionSource(); - /* Test */ - await client.PublishAsync(new SimpleMessage(), - configuration: cfg => cfg - .WithMandatoryDelivery((obj, evt) => - { - tcs.SetResult(true); - })); - - /* Assert */ - Assert.True(tcs.Task.Result); - } - } - } + using Context; + using RabbitMQ.Client.Exceptions; + + public class PublishAndSubscribeTests : IntegrationTestBase + { + + [Fact] + public async Task Should_Be_Able_To_Subscribe_Without_Any_Additional_Config() + { + /* Setup */ + using (var publisher = TestClientFactory.CreateNormal()) + using (var subscriber = TestClientFactory.CreateNormal()) + { + var message = new BasicMessage {Prop = "Hello, world!"}; + var recievedTcs = new TaskCompletionSource(); + + subscriber.SubscribeAsync((msg, info) => + { + if (msg.Prop == message.Prop) + { + recievedTcs.SetResult(msg); + } + return Task.FromResult(true); + }); + + /* Test */ + publisher.PublishAsync(message); + await recievedTcs.Task; + + /* Assert */ + Assert.Equal(expected: message.Prop, actual: recievedTcs.Task.Result.Prop); + } + } + + [Fact] + public async Task Should_Be_Able_To_Perform_Multiple_Pub_Subs() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + const int numberOfCalls = 100; + var recived = 0; + var recievedTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + Interlocked.Increment(ref recived); + if (numberOfCalls == recived) + { + recievedTcs.SetResult(true); + } + return Task.FromResult(true); + }); + + /* Test */ + var sw = Stopwatch.StartNew(); + for (int i = 0; i < numberOfCalls; i++) + { + publisher.PublishAsync(); + } + await recievedTcs.Task; + sw.Stop(); + + /* Assert */ + Assert.True(true, $"Completed {numberOfCalls} in {sw.ElapsedMilliseconds} ms."); + } + } + + [Fact] + public void Should_Be_Able_To_Perform_Subscribe_For_Multiple_Types() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var basicTcs = new TaskCompletionSource(); + var simpleTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + basicTcs.SetResult(message); + return Task.FromResult(true); + }); + subscriber.SubscribeAsync((message, context) => + { + simpleTcs.SetResult(message); + return Task.FromResult(true); + }); + + /* Test */ + publisher.PublishAsync(); + publisher.PublishAsync(); + Task.WaitAll(basicTcs.Task, simpleTcs.Task); + + /* Assert */ + Assert.True(true, "Successfully recieved messages."); + } + } + + [Fact] + public async Task Should_Throw_Publish_Confirm_Exception_If_Server_Doesnt_Respond_Within_Time_Limit() + { + /* Setup */ + var publisher = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(p => + { + var config = RawRabbitConfiguration.Local; + config.PublishConfirmTimeout = TimeSpan.FromTicks(1); + return config; + })); + using (publisher) + { + /* Test */ + /* Assert */ + try + { + await publisher.PublishAsync(); + } + catch (PublishConfirmException) + { + Assert.True(true); + } + } + } + + [Fact] + public void Should_Be_Able_To_Confirm_Multiple_Messages() + { + /* Setup */ + const int numberOfCalls = 100; + var confirmTasks = new Task[numberOfCalls]; + using (var publisher = TestClientFactory.CreateNormal()) + { + for (int i = 0; i < numberOfCalls; i++) + { + var confirmTask = publisher.PublishAsync(); + confirmTasks[i] = confirmTask; + } + Task.WaitAll(confirmTasks); + Task.Delay(500).Wait(); + + Assert.True(true, "Successfully confirmed all messages."); + } + } + + [Fact] + public void Should_Be_Able_To_Delivery_Message_To_Multiple_Subscribers_On_Same_Host() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var firstTcs = new TaskCompletionSource(); + var secondTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + firstTcs.SetResult(true); + return Task.FromResult(true); + }); + subscriber.SubscribeAsync((message, context) => + { + secondTcs.SetResult(true); + return Task.FromResult(true); + }); + + /* Test */ + var ackTask = publisher.PublishAsync(); + Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); + + /* Assert */ + Assert.True(true, "Published and subscribe sucessfull."); + } + } + + [Fact] + public void Should_Be_Able_To_Deliver_Messages_To_Unique_Subscribers() + { + /* Setup */ + using (var firstSubscriber = TestClientFactory.CreateNormal()) + using (var secondSubscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var firstTcs = new TaskCompletionSource(); + var secondTcs = new TaskCompletionSource(); + firstSubscriber.SubscribeAsync((message, context) => + { + firstTcs.SetResult(true); + return Task.FromResult(true); + }, cfg => cfg.WithSubscriberId("first_subscriber")); + secondSubscriber.SubscribeAsync((message, context) => + { + secondTcs.SetResult(true); + return Task.FromResult(true); + }, cfg => cfg.WithSubscriberId("second_subscriber")); + + /* Test */ + var ackTask = publisher.PublishAsync(); + Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); + + /* Assert */ + Assert.True(true, "Published and subscribe sucessfull."); + + } + } + + [Fact] + public async Task Should_Be_Able_To_Use_Priority() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var prioritySent = false; + var queueBuilt = new TaskCompletionSource(); + var priorityTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync(async (message, context) => + { + await queueBuilt.Task; + if (!prioritySent) + { + await subscriber.PublishAsync(new BasicMessage + { + Prop = "I am important!" + }, configuration: cfg => cfg.WithProperties(p => + { + p.Priority = 3; + })); + prioritySent = true; + } + else + { + priorityTcs.TrySetResult(message); + } + + }, cfg => cfg + .WithQueue(q => q.WithArgument(QueueArgument.MaxPriority, 3)) + .WithSubscriberId("priority") + .WithPrefetchCount(1)); + + /* Test */ + await publisher.PublishAsync(new BasicMessage {Prop = "I will be delivered"}); + await + publisher.PublishAsync(new BasicMessage {Prop = "Someone will pass me in the queue"}, + configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); + queueBuilt.SetResult(true); + await priorityTcs.Task; + + /* Asset */ + Assert.Equal(expected: "I am important!", actual: priorityTcs.Task.Result.Prop); + } + } + + [Fact] + public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() + { + /* Setup */ + using (var publisher = TestClientFactory.CreateNormal()) + using (var subscriber = TestClientFactory.CreateNormal()) + { + var firstMessage = new BasicMessage {Prop = "Value"}; + var secondMessage = new BasicMessage {Prop = "AnotherValue"}; + var firstRecievedTcs = new TaskCompletionSource(); + var secondRecievedTcs = new TaskCompletionSource(); + var recievedCount = 0; + + var subscription = subscriber.SubscribeAsync((message, context) => + { + recievedCount++; + if (!firstRecievedTcs.Task.IsCompleted) + { + firstRecievedTcs.SetResult(message); + } + return Task.FromResult(true); + }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); + + /* Test */ + publisher.PublishAsync(firstMessage); + await firstRecievedTcs.Task; + subscription.Dispose(); + var recievedAfterFirstPublish = recievedCount; + publisher.PublishAsync(secondMessage); + await Task.Delay(20); + publisher.SubscribeAsync((message, context) => + { + secondRecievedTcs.SetResult(message); + return Task.FromResult(true); + }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); + await secondRecievedTcs.Task; + TestChannel.QueueDelete(subscription.QueueName); + /* Assert */ + Assert.Equal(recievedAfterFirstPublish, recievedCount); + Assert.Equal(firstRecievedTcs.Task.Result.Prop, firstMessage.Prop); + Assert.Equal(secondRecievedTcs.Task.Result.Prop, secondMessage.Prop); + } + } + + [Fact] + public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() + { + var conventions = new NamingConventions(); + using ( + var client = + TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + var subscription = client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }); + var uniqueValue = Guid.NewGuid().ToString(); + var jsonMsg = JsonConvert.SerializeObject(new BasicMessage {Prop = uniqueValue}); + + /* Test */ + TestChannel.BasicPublish( + conventions.ExchangeNamingConvention(typeof(BasicMessage)), + conventions.QueueNamingConvention(typeof(BasicMessage)), + true, + null, + Encoding.UTF8.GetBytes(jsonMsg)); + await tcs.Task; + + /* Assert */ + Assert.Equal(uniqueValue, tcs.Task.Result.Prop); + } + } + + [Fact] + public async void Should_Be_Able_To_Publish_Dynamic_Objects() + { + using (var client = TestClientFactory.CreateNormal()) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }); + + /* Test */ + client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + } + + [Fact] + public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() + { + using (var firstClient = TestClientFactory.CreateNormal()) + using (var secondClient = TestClientFactory.CreateNormal()) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + firstClient.SubscribeAsync((message, context) => + { + tcs.TrySetResult(true); + return Task.FromResult(true); + }); + + /* Test */ + try + { + await + secondClient.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg.WithExchange(e => e.WithType(ExchangeType.Direct))); + } + catch (Exception) + { + await Task.Delay(50); + Assert.False(tcs.Task.IsCompleted); + } + secondClient.PublishAsync(new SimpleMessage()); + await tcs.Task; + /* Assert */ + Assert.True(true); + } + } + + [Fact] + public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() + { + /* Setup */ + using (var client = TestClientFactory.CreateNormal()) + { + var tcs = new TaskCompletionSource(); + /* Test */ + await client.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg + .WithMandatoryDelivery((obj, evt) => + { + tcs.SetResult(true); + })); + + /* Assert */ + Assert.True(tcs.Task.Result); + } + } + + [Fact] + public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string exchangeName = "initialized.exchange"; + const string queueName = "initialized.queue"; + + try + { + TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); + TestChannel.QueueDeclare(queueName, true, false, false); + + var tcs = new TaskCompletionSource(); + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); + + /* Test */ + await client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }, + configuration: cfg => cfg + .WithExchange( + e => e + .AssumeInitialized() + .WithName(exchangeName))); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + finally + { + /* Teardown */ + TestChannel.QueueDelete(queueName); + TestChannel.ExchangeDelete(exchangeName); + } + } + } + + [Fact] + public async Task Should_Throw_When_Subscribed_Not_Initialized_Queue() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string exchangeName = "initialized.exchange"; + const string queueName = "not.initialized.queue"; + var tcs = new TaskCompletionSource(); + TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); + + /* Test */ + try + { + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + .WithExchange(e => e + .WithName(exchangeName))); + + await client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}, + configuration: cfg => cfg + .WithExchange( + e => e + .AssumeInitialized() + .WithName(exchangeName))); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + catch (AggregateException) + { + Assert.False(tcs.Task.IsCompleted); + } + catch (OperationInterruptedException) + { + Assert.False(tcs.Task.IsCompleted); + } + finally + { + /* Teardown */ + TestChannel.QueueDelete(queueName); + TestChannel.ExchangeDelete(exchangeName); + } + } + } + } } From 597cb8b845caddde6bf5fede4ac7575c87e9ba8a Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Wed, 23 Nov 2016 21:31:20 +0300 Subject: [PATCH 15/21] cleaned up tests and fixed tab --- .../Configuration/Queue/QueueConfiguration.cs | 2 +- .../SimpleUse/PublishAndSubscribeTests.cs | 52 +++++-------------- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs index 61a67646..6ef159ea 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs @@ -24,7 +24,7 @@ public string FullQueueName public bool Exclusive { get; set; } public bool AutoDelete { get; set; } public Dictionary Arguments { get; set; } - public bool AssumeInitialized { get; set; } + public bool AssumeInitialized { get; set; } public QueueConfiguration() { diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs index 8d2c97b6..aa809c01 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs @@ -12,11 +12,11 @@ using RawRabbit.IntegrationTests.TestMessages; using Xunit; using ExchangeType = RawRabbit.Configuration.Exchange.ExchangeType; +using RabbitMQ.Client.Exceptions; namespace RawRabbit.IntegrationTests.SimpleUse { - using Context; - using RabbitMQ.Client.Exceptions; + public class PublishAndSubscribeTests : IntegrationTestBase { @@ -447,10 +447,9 @@ public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() /* Test */ await client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }, configuration: cfg => cfg - .WithExchange( - e => e - .AssumeInitialized() - .WithName(exchangeName))); + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); await tcs.Task; /* Assert */ @@ -466,21 +465,21 @@ public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() } [Fact] - public async Task Should_Throw_When_Subscribed_Not_Initialized_Queue() + public void Should_Throw_When_Subscribed_Not_Initialized_Queue() { using (var client = TestClientFactory.CreateNormal()) { /* Setup */ - const string exchangeName = "initialized.exchange"; const string queueName = "not.initialized.queue"; var tcs = new TaskCompletionSource(); - TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); + /* Test */ - try - { - client.SubscribeAsync((message, context) => + + Assert.Throws(() => + + client.SubscribeAsync((message, context) => { tcs.TrySetResult(message); return Task.FromResult(true); @@ -488,34 +487,9 @@ public async Task Should_Throw_When_Subscribed_Not_Initialized_Queue() .WithQueue(q => q .AssumeInitialized() .WithName(queueName)) - .WithExchange(e => e - .WithName(exchangeName))); - - await client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}, - configuration: cfg => cfg - .WithExchange( - e => e - .AssumeInitialized() - .WithName(exchangeName))); - await tcs.Task; + )); - /* Assert */ - Assert.True(tcs.Task.Result.Body.IsDynamic); - } - catch (AggregateException) - { - Assert.False(tcs.Task.IsCompleted); - } - catch (OperationInterruptedException) - { - Assert.False(tcs.Task.IsCompleted); - } - finally - { - /* Teardown */ - TestChannel.QueueDelete(queueName); - TestChannel.ExchangeDelete(exchangeName); - } + Assert.False(tcs.Task.IsCompleted); } } } From 1fbd0df59b0321e26251390edb0fad4303757d04 Mon Sep 17 00:00:00 2001 From: "cemre.mengu" Date: Thu, 8 Dec 2016 10:31:36 +0300 Subject: [PATCH 16/21] Removed the check from queue config and used no subscriber id, also tabified tests --- .../Configuration/Queue/QueueConfiguration.cs | 2 +- .../SimpleUse/PublishAndSubscribeTests.cs | 954 +++++++++--------- 2 files changed, 477 insertions(+), 479 deletions(-) diff --git a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs index 6ef159ea..0adeafbe 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfiguration.cs @@ -8,7 +8,7 @@ public string FullQueueName { get { - var fullQueueName = string.IsNullOrEmpty(NameSuffix) || AssumeInitialized + var fullQueueName = string.IsNullOrEmpty(NameSuffix) ? QueueName : $"{QueueName}_{NameSuffix}"; diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs index aa809c01..3297b63d 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/PublishAndSubscribeTests.cs @@ -12,486 +12,484 @@ using RawRabbit.IntegrationTests.TestMessages; using Xunit; using ExchangeType = RawRabbit.Configuration.Exchange.ExchangeType; -using RabbitMQ.Client.Exceptions; namespace RawRabbit.IntegrationTests.SimpleUse { - - public class PublishAndSubscribeTests : IntegrationTestBase - { - - [Fact] - public async Task Should_Be_Able_To_Subscribe_Without_Any_Additional_Config() - { - /* Setup */ - using (var publisher = TestClientFactory.CreateNormal()) - using (var subscriber = TestClientFactory.CreateNormal()) - { - var message = new BasicMessage {Prop = "Hello, world!"}; - var recievedTcs = new TaskCompletionSource(); - - subscriber.SubscribeAsync((msg, info) => - { - if (msg.Prop == message.Prop) - { - recievedTcs.SetResult(msg); - } - return Task.FromResult(true); - }); - - /* Test */ - publisher.PublishAsync(message); - await recievedTcs.Task; - - /* Assert */ - Assert.Equal(expected: message.Prop, actual: recievedTcs.Task.Result.Prop); - } - } - - [Fact] - public async Task Should_Be_Able_To_Perform_Multiple_Pub_Subs() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - const int numberOfCalls = 100; - var recived = 0; - var recievedTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - Interlocked.Increment(ref recived); - if (numberOfCalls == recived) - { - recievedTcs.SetResult(true); - } - return Task.FromResult(true); - }); - - /* Test */ - var sw = Stopwatch.StartNew(); - for (int i = 0; i < numberOfCalls; i++) - { - publisher.PublishAsync(); - } - await recievedTcs.Task; - sw.Stop(); - - /* Assert */ - Assert.True(true, $"Completed {numberOfCalls} in {sw.ElapsedMilliseconds} ms."); - } - } - - [Fact] - public void Should_Be_Able_To_Perform_Subscribe_For_Multiple_Types() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var basicTcs = new TaskCompletionSource(); - var simpleTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - basicTcs.SetResult(message); - return Task.FromResult(true); - }); - subscriber.SubscribeAsync((message, context) => - { - simpleTcs.SetResult(message); - return Task.FromResult(true); - }); - - /* Test */ - publisher.PublishAsync(); - publisher.PublishAsync(); - Task.WaitAll(basicTcs.Task, simpleTcs.Task); - - /* Assert */ - Assert.True(true, "Successfully recieved messages."); - } - } - - [Fact] - public async Task Should_Throw_Publish_Confirm_Exception_If_Server_Doesnt_Respond_Within_Time_Limit() - { - /* Setup */ - var publisher = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(p => - { - var config = RawRabbitConfiguration.Local; - config.PublishConfirmTimeout = TimeSpan.FromTicks(1); - return config; - })); - using (publisher) - { - /* Test */ - /* Assert */ - try - { - await publisher.PublishAsync(); - } - catch (PublishConfirmException) - { - Assert.True(true); - } - } - } - - [Fact] - public void Should_Be_Able_To_Confirm_Multiple_Messages() - { - /* Setup */ - const int numberOfCalls = 100; - var confirmTasks = new Task[numberOfCalls]; - using (var publisher = TestClientFactory.CreateNormal()) - { - for (int i = 0; i < numberOfCalls; i++) - { - var confirmTask = publisher.PublishAsync(); - confirmTasks[i] = confirmTask; - } - Task.WaitAll(confirmTasks); - Task.Delay(500).Wait(); - - Assert.True(true, "Successfully confirmed all messages."); - } - } - - [Fact] - public void Should_Be_Able_To_Delivery_Message_To_Multiple_Subscribers_On_Same_Host() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var firstTcs = new TaskCompletionSource(); - var secondTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync((message, context) => - { - firstTcs.SetResult(true); - return Task.FromResult(true); - }); - subscriber.SubscribeAsync((message, context) => - { - secondTcs.SetResult(true); - return Task.FromResult(true); - }); - - /* Test */ - var ackTask = publisher.PublishAsync(); - Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); - - /* Assert */ - Assert.True(true, "Published and subscribe sucessfull."); - } - } - - [Fact] - public void Should_Be_Able_To_Deliver_Messages_To_Unique_Subscribers() - { - /* Setup */ - using (var firstSubscriber = TestClientFactory.CreateNormal()) - using (var secondSubscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var firstTcs = new TaskCompletionSource(); - var secondTcs = new TaskCompletionSource(); - firstSubscriber.SubscribeAsync((message, context) => - { - firstTcs.SetResult(true); - return Task.FromResult(true); - }, cfg => cfg.WithSubscriberId("first_subscriber")); - secondSubscriber.SubscribeAsync((message, context) => - { - secondTcs.SetResult(true); - return Task.FromResult(true); - }, cfg => cfg.WithSubscriberId("second_subscriber")); - - /* Test */ - var ackTask = publisher.PublishAsync(); - Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); - - /* Assert */ - Assert.True(true, "Published and subscribe sucessfull."); - - } - } - - [Fact] - public async Task Should_Be_Able_To_Use_Priority() - { - /* Setup */ - using (var subscriber = TestClientFactory.CreateNormal()) - using (var publisher = TestClientFactory.CreateNormal()) - { - var prioritySent = false; - var queueBuilt = new TaskCompletionSource(); - var priorityTcs = new TaskCompletionSource(); - subscriber.SubscribeAsync(async (message, context) => - { - await queueBuilt.Task; - if (!prioritySent) - { - await subscriber.PublishAsync(new BasicMessage - { - Prop = "I am important!" - }, configuration: cfg => cfg.WithProperties(p => - { - p.Priority = 3; - })); - prioritySent = true; - } - else - { - priorityTcs.TrySetResult(message); - } - - }, cfg => cfg - .WithQueue(q => q.WithArgument(QueueArgument.MaxPriority, 3)) - .WithSubscriberId("priority") - .WithPrefetchCount(1)); - - /* Test */ - await publisher.PublishAsync(new BasicMessage {Prop = "I will be delivered"}); - await - publisher.PublishAsync(new BasicMessage {Prop = "Someone will pass me in the queue"}, - configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); - queueBuilt.SetResult(true); - await priorityTcs.Task; - - /* Asset */ - Assert.Equal(expected: "I am important!", actual: priorityTcs.Task.Result.Prop); - } - } - - [Fact] - public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() - { - /* Setup */ - using (var publisher = TestClientFactory.CreateNormal()) - using (var subscriber = TestClientFactory.CreateNormal()) - { - var firstMessage = new BasicMessage {Prop = "Value"}; - var secondMessage = new BasicMessage {Prop = "AnotherValue"}; - var firstRecievedTcs = new TaskCompletionSource(); - var secondRecievedTcs = new TaskCompletionSource(); - var recievedCount = 0; - - var subscription = subscriber.SubscribeAsync((message, context) => - { - recievedCount++; - if (!firstRecievedTcs.Task.IsCompleted) - { - firstRecievedTcs.SetResult(message); - } - return Task.FromResult(true); - }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); - - /* Test */ - publisher.PublishAsync(firstMessage); - await firstRecievedTcs.Task; - subscription.Dispose(); - var recievedAfterFirstPublish = recievedCount; - publisher.PublishAsync(secondMessage); - await Task.Delay(20); - publisher.SubscribeAsync((message, context) => - { - secondRecievedTcs.SetResult(message); - return Task.FromResult(true); - }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); - await secondRecievedTcs.Task; - TestChannel.QueueDelete(subscription.QueueName); - /* Assert */ - Assert.Equal(recievedAfterFirstPublish, recievedCount); - Assert.Equal(firstRecievedTcs.Task.Result.Prop, firstMessage.Prop); - Assert.Equal(secondRecievedTcs.Task.Result.Prop, secondMessage.Prop); - } - } - - [Fact] - public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() - { - var conventions = new NamingConventions(); - using ( - var client = - TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - var subscription = client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }); - var uniqueValue = Guid.NewGuid().ToString(); - var jsonMsg = JsonConvert.SerializeObject(new BasicMessage {Prop = uniqueValue}); - - /* Test */ - TestChannel.BasicPublish( - conventions.ExchangeNamingConvention(typeof(BasicMessage)), - conventions.QueueNamingConvention(typeof(BasicMessage)), - true, - null, - Encoding.UTF8.GetBytes(jsonMsg)); - await tcs.Task; - - /* Assert */ - Assert.Equal(uniqueValue, tcs.Task.Result.Prop); - } - } - - [Fact] - public async void Should_Be_Able_To_Publish_Dynamic_Objects() - { - using (var client = TestClientFactory.CreateNormal()) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }); - - /* Test */ - client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}); - await tcs.Task; - - /* Assert */ - Assert.True(tcs.Task.Result.Body.IsDynamic); - } - } - - [Fact] - public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() - { - using (var firstClient = TestClientFactory.CreateNormal()) - using (var secondClient = TestClientFactory.CreateNormal()) - { - /* Setup */ - var tcs = new TaskCompletionSource(); - firstClient.SubscribeAsync((message, context) => - { - tcs.TrySetResult(true); - return Task.FromResult(true); - }); - - /* Test */ - try - { - await - secondClient.PublishAsync(new SimpleMessage(), - configuration: cfg => cfg.WithExchange(e => e.WithType(ExchangeType.Direct))); - } - catch (Exception) - { - await Task.Delay(50); - Assert.False(tcs.Task.IsCompleted); - } - secondClient.PublishAsync(new SimpleMessage()); - await tcs.Task; - /* Assert */ - Assert.True(true); - } - } - - [Fact] - public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() - { - /* Setup */ - using (var client = TestClientFactory.CreateNormal()) - { - var tcs = new TaskCompletionSource(); - /* Test */ - await client.PublishAsync(new SimpleMessage(), - configuration: cfg => cfg - .WithMandatoryDelivery((obj, evt) => - { - tcs.SetResult(true); - })); - - /* Assert */ - Assert.True(tcs.Task.Result); - } - } - - [Fact] - public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() - { - using (var client = TestClientFactory.CreateNormal()) - { - - /* Setup */ - const string exchangeName = "initialized.exchange"; - const string queueName = "initialized.queue"; - - try - { - TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); - TestChannel.QueueDeclare(queueName, true, false, false); - - var tcs = new TaskCompletionSource(); - client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }, cfg => cfg - .WithQueue(q => q - .AssumeInitialized() - .WithName(queueName)) - .WithExchange(e => e - .AssumeInitialized() - .WithName(exchangeName))); - - /* Test */ - await client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }, - configuration: cfg => cfg - .WithExchange(e => e - .AssumeInitialized() - .WithName(exchangeName))); - await tcs.Task; - - /* Assert */ - Assert.True(tcs.Task.Result.Body.IsDynamic); - } - finally - { - /* Teardown */ - TestChannel.QueueDelete(queueName); - TestChannel.ExchangeDelete(exchangeName); - } - } - } - - [Fact] - public void Should_Throw_When_Subscribed_Not_Initialized_Queue() - { - using (var client = TestClientFactory.CreateNormal()) - { - - /* Setup */ - const string queueName = "not.initialized.queue"; - var tcs = new TaskCompletionSource(); - - - /* Test */ - - Assert.Throws(() => - - client.SubscribeAsync((message, context) => - { - tcs.TrySetResult(message); - return Task.FromResult(true); - }, cfg => cfg - .WithQueue(q => q - .AssumeInitialized() - .WithName(queueName)) - )); - - Assert.False(tcs.Task.IsCompleted); - } - } - } + public class PublishAndSubscribeTests : IntegrationTestBase + { + + [Fact] + public async Task Should_Be_Able_To_Subscribe_Without_Any_Additional_Config() + { + /* Setup */ + using (var publisher = TestClientFactory.CreateNormal()) + using (var subscriber = TestClientFactory.CreateNormal()) + { + var message = new BasicMessage {Prop = "Hello, world!"}; + var recievedTcs = new TaskCompletionSource(); + + subscriber.SubscribeAsync((msg, info) => + { + if (msg.Prop == message.Prop) + { + recievedTcs.SetResult(msg); + } + return Task.FromResult(true); + }); + + /* Test */ + publisher.PublishAsync(message); + await recievedTcs.Task; + + /* Assert */ + Assert.Equal(expected: message.Prop, actual: recievedTcs.Task.Result.Prop); + } + } + + [Fact] + public async Task Should_Be_Able_To_Perform_Multiple_Pub_Subs() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + const int numberOfCalls = 100; + var recived = 0; + var recievedTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + Interlocked.Increment(ref recived); + if (numberOfCalls == recived) + { + recievedTcs.SetResult(true); + } + return Task.FromResult(true); + }); + + /* Test */ + var sw = Stopwatch.StartNew(); + for (int i = 0; i < numberOfCalls; i++) + { + publisher.PublishAsync(); + } + await recievedTcs.Task; + sw.Stop(); + + /* Assert */ + Assert.True(true, $"Completed {numberOfCalls} in {sw.ElapsedMilliseconds} ms."); + } + } + + [Fact] + public void Should_Be_Able_To_Perform_Subscribe_For_Multiple_Types() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var basicTcs = new TaskCompletionSource(); + var simpleTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + basicTcs.SetResult(message); + return Task.FromResult(true); + }); + subscriber.SubscribeAsync((message, context) => + { + simpleTcs.SetResult(message); + return Task.FromResult(true); + }); + + /* Test */ + publisher.PublishAsync(); + publisher.PublishAsync(); + Task.WaitAll(basicTcs.Task, simpleTcs.Task); + + /* Assert */ + Assert.True(true, "Successfully recieved messages."); + } + } + + [Fact] + public async Task Should_Throw_Publish_Confirm_Exception_If_Server_Doesnt_Respond_Within_Time_Limit() + { + /* Setup */ + var publisher = TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(p => + { + var config = RawRabbitConfiguration.Local; + config.PublishConfirmTimeout = TimeSpan.FromTicks(1); + return config; + })); + using (publisher) + { + /* Test */ + /* Assert */ + try + { + await publisher.PublishAsync(); + } + catch (PublishConfirmException) + { + Assert.True(true); + } + } + } + + [Fact] + public void Should_Be_Able_To_Confirm_Multiple_Messages() + { + /* Setup */ + const int numberOfCalls = 100; + var confirmTasks = new Task[numberOfCalls]; + using (var publisher = TestClientFactory.CreateNormal()) + { + for (int i = 0; i < numberOfCalls; i++) + { + var confirmTask = publisher.PublishAsync(); + confirmTasks[i] = confirmTask; + } + Task.WaitAll(confirmTasks); + Task.Delay(500).Wait(); + + Assert.True(true, "Successfully confirmed all messages."); + } + } + + [Fact] + public void Should_Be_Able_To_Delivery_Message_To_Multiple_Subscribers_On_Same_Host() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var firstTcs = new TaskCompletionSource(); + var secondTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync((message, context) => + { + firstTcs.SetResult(true); + return Task.FromResult(true); + }); + subscriber.SubscribeAsync((message, context) => + { + secondTcs.SetResult(true); + return Task.FromResult(true); + }); + + /* Test */ + var ackTask = publisher.PublishAsync(); + Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); + + /* Assert */ + Assert.True(true, "Published and subscribe sucessfull."); + } + } + + [Fact] + public void Should_Be_Able_To_Deliver_Messages_To_Unique_Subscribers() + { + /* Setup */ + using (var firstSubscriber = TestClientFactory.CreateNormal()) + using (var secondSubscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var firstTcs = new TaskCompletionSource(); + var secondTcs = new TaskCompletionSource(); + firstSubscriber.SubscribeAsync((message, context) => + { + firstTcs.SetResult(true); + return Task.FromResult(true); + }, cfg => cfg.WithSubscriberId("first_subscriber")); + secondSubscriber.SubscribeAsync((message, context) => + { + secondTcs.SetResult(true); + return Task.FromResult(true); + }, cfg => cfg.WithSubscriberId("second_subscriber")); + + /* Test */ + var ackTask = publisher.PublishAsync(); + Task.WaitAll(ackTask, firstTcs.Task, secondTcs.Task); + + /* Assert */ + Assert.True(true, "Published and subscribe sucessfull."); + + } + } + + [Fact] + public async Task Should_Be_Able_To_Use_Priority() + { + /* Setup */ + using (var subscriber = TestClientFactory.CreateNormal()) + using (var publisher = TestClientFactory.CreateNormal()) + { + var prioritySent = false; + var queueBuilt = new TaskCompletionSource(); + var priorityTcs = new TaskCompletionSource(); + subscriber.SubscribeAsync(async (message, context) => + { + await queueBuilt.Task; + if (!prioritySent) + { + await subscriber.PublishAsync(new BasicMessage + { + Prop = "I am important!" + }, configuration: cfg => cfg.WithProperties(p => + { + p.Priority = 3; + })); + prioritySent = true; + } + else + { + priorityTcs.TrySetResult(message); + } + + }, cfg => cfg + .WithQueue(q => q.WithArgument(QueueArgument.MaxPriority, 3)) + .WithSubscriberId("priority") + .WithPrefetchCount(1)); + + /* Test */ + await publisher.PublishAsync(new BasicMessage {Prop = "I will be delivered"}); + await + publisher.PublishAsync(new BasicMessage {Prop = "Someone will pass me in the queue"}, + configuration: cfg => cfg.WithProperties(p => p.Priority = 0)); + queueBuilt.SetResult(true); + await priorityTcs.Task; + + /* Asset */ + Assert.Equal(expected: "I am important!", actual: priorityTcs.Task.Result.Prop); + } + } + + [Fact] + public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed() + { + /* Setup */ + using (var publisher = TestClientFactory.CreateNormal()) + using (var subscriber = TestClientFactory.CreateNormal()) + { + var firstMessage = new BasicMessage {Prop = "Value"}; + var secondMessage = new BasicMessage {Prop = "AnotherValue"}; + var firstRecievedTcs = new TaskCompletionSource(); + var secondRecievedTcs = new TaskCompletionSource(); + var recievedCount = 0; + + var subscription = subscriber.SubscribeAsync((message, context) => + { + recievedCount++; + if (!firstRecievedTcs.Task.IsCompleted) + { + firstRecievedTcs.SetResult(message); + } + return Task.FromResult(true); + }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); + + /* Test */ + publisher.PublishAsync(firstMessage); + await firstRecievedTcs.Task; + subscription.Dispose(); + var recievedAfterFirstPublish = recievedCount; + publisher.PublishAsync(secondMessage); + await Task.Delay(20); + publisher.SubscribeAsync((message, context) => + { + secondRecievedTcs.SetResult(message); + return Task.FromResult(true); + }, cfg => cfg.WithQueue(q => q.WithAutoDelete(false))); + await secondRecievedTcs.Task; + TestChannel.QueueDelete(subscription.QueueName); + /* Assert */ + Assert.Equal(recievedAfterFirstPublish, recievedCount); + Assert.Equal(firstRecievedTcs.Task.Result.Prop, firstMessage.Prop); + Assert.Equal(secondRecievedTcs.Task.Result.Prop, secondMessage.Prop); + } + } + + [Fact] + public async Task Should_Be_Able_To_Subscibe_To_Pure_Json_Message() + { + var conventions = new NamingConventions(); + using ( + var client = + TestClientFactory.CreateNormal(ioc => ioc.AddSingleton(c => conventions))) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + var subscription = client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }); + var uniqueValue = Guid.NewGuid().ToString(); + var jsonMsg = JsonConvert.SerializeObject(new BasicMessage {Prop = uniqueValue}); + + /* Test */ + TestChannel.BasicPublish( + conventions.ExchangeNamingConvention(typeof(BasicMessage)), + conventions.QueueNamingConvention(typeof(BasicMessage)), + true, + null, + Encoding.UTF8.GetBytes(jsonMsg)); + await tcs.Task; + + /* Assert */ + Assert.Equal(uniqueValue, tcs.Task.Result.Prop); + } + } + + [Fact] + public async void Should_Be_Able_To_Publish_Dynamic_Objects() + { + using (var client = TestClientFactory.CreateNormal()) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }); + + /* Test */ + client.PublishAsync(new DynamicMessage {Body = new {IsDynamic = true}}); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + } + + [Fact] + public async Task Should_Be_Able_To_Publish_Message_After_Failed_Publish() + { + using (var firstClient = TestClientFactory.CreateNormal()) + using (var secondClient = TestClientFactory.CreateNormal()) + { + /* Setup */ + var tcs = new TaskCompletionSource(); + firstClient.SubscribeAsync((message, context) => + { + tcs.TrySetResult(true); + return Task.FromResult(true); + }); + + /* Test */ + try + { + await + secondClient.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg.WithExchange(e => e.WithType(ExchangeType.Direct))); + } + catch (Exception) + { + await Task.Delay(50); + Assert.False(tcs.Task.IsCompleted); + } + secondClient.PublishAsync(new SimpleMessage()); + await tcs.Task; + /* Assert */ + Assert.True(true); + } + } + + [Fact] + public async Task Should_Run_Basic_Return_When_The_Manatory_Set_After_Failed_Publish() + { + /* Setup */ + using (var client = TestClientFactory.CreateNormal()) + { + var tcs = new TaskCompletionSource(); + /* Test */ + await client.PublishAsync(new SimpleMessage(), + configuration: cfg => cfg + .WithMandatoryDelivery((obj, evt) => + { + tcs.SetResult(true); + })); + + /* Assert */ + Assert.True(tcs.Task.Result); + } + } + + [Fact] + public async Task Should_Be_Able_To_Subscribe_Initialized_Queue_And_Exchange() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string exchangeName = "initialized.exchange"; + const string queueName = "initialized.queue"; + + try + { + TestChannel.ExchangeDeclare(exchangeName, RabbitMQ.Client.ExchangeType.Fanout); + TestChannel.QueueDeclare(queueName, true, false, false); + + var tcs = new TaskCompletionSource(); + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + .WithSubscriberId(string.Empty) + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); + + /* Test */ + await client.PublishAsync(new DynamicMessage { Body = new { IsDynamic = true } }, + configuration: cfg => cfg + .WithExchange(e => e + .AssumeInitialized() + .WithName(exchangeName))); + await tcs.Task; + + /* Assert */ + Assert.True(tcs.Task.Result.Body.IsDynamic); + } + finally + { + /* Teardown */ + TestChannel.QueueDelete(queueName); + TestChannel.ExchangeDelete(exchangeName); + } + } + } + + [Fact] + public void Should_Throw_When_Subscribed_Not_Initialized_Queue() + { + using (var client = TestClientFactory.CreateNormal()) + { + + /* Setup */ + const string queueName = "not.initialized.queue"; + var tcs = new TaskCompletionSource(); + + + /* Test */ + + Assert.Throws(() => + + client.SubscribeAsync((message, context) => + { + tcs.TrySetResult(message); + return Task.FromResult(true); + }, cfg => cfg + .WithQueue(q => q + .AssumeInitialized() + .WithName(queueName)) + )); + + Assert.False(tcs.Task.IsCompleted); + } + } + } } - From 4b14c790da7904f8f50a0df389f499542256e52e Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Mon, 19 Dec 2016 07:35:02 +0100 Subject: [PATCH 17/21] Minor cleanup --- .../Queue/QueueConfigurationBuilder.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs index 477a1fe4..426673f6 100644 --- a/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs +++ b/src/RawRabbit/Configuration/Queue/QueueConfigurationBuilder.cs @@ -1,10 +1,8 @@ -using System; - -namespace RawRabbit.Configuration.Queue +namespace RawRabbit.Configuration.Queue { public class QueueConfigurationBuilder : IQueueConfigurationBuilder { - public QueueConfiguration Configuration { get;} + public QueueConfiguration Configuration { get; } public QueueConfigurationBuilder(QueueConfiguration initialQueue = null) { @@ -47,10 +45,10 @@ public IQueueConfigurationBuilder WithArgument(string key, object value) return this; } - public IQueueConfigurationBuilder AssumeInitialized(bool asumption = true) - { - Configuration.AssumeInitialized = asumption; - return this; - } - } + public IQueueConfigurationBuilder AssumeInitialized(bool asumption = true) + { + Configuration.AssumeInitialized = asumption; + return this; + } + } } From ed342b35f57f959a8a41d1fbe3dc01feccfbd199 Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Mon, 19 Dec 2016 07:50:08 +0100 Subject: [PATCH 18/21] (#150) Rename ExceptionInformation --- src/RawRabbit/ErrorHandling/DefaultStrategy.cs | 6 +++--- ...ndlerExceptionInformation.cs => ExceptionInformation.cs} | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) rename src/RawRabbit/Exceptions/{MessageHandlerExceptionInformation.cs => ExceptionInformation.cs} (86%) diff --git a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs index 6508950c..829dfa62 100644 --- a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs +++ b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs @@ -39,7 +39,7 @@ public virtual Task OnResponseHandlerExceptionAsync(IRawConsumer rawConsumer, IC { _logger.LogError($"An unhandled exception was thrown in the response handler for message '{args.BasicProperties.MessageId}'.", exception); var innerException = UnwrapInnerException(exception); - var exceptionInfo = new MessageHandlerExceptionInformation + var exceptionInfo = new ExceptionInformation { Message = $"An unhandled exception was thrown when consuming a message\n MessageId: {args.BasicProperties.MessageId}\n Queue: '{cfg.Queue.FullQueueName}'\n Exchange: '{cfg.Exchange.ExchangeName}'\nSee inner exception for more details.", ExceptionType = innerException.GetType().FullName, @@ -50,7 +50,7 @@ public virtual Task OnResponseHandlerExceptionAsync(IRawConsumer rawConsumer, IC rawConsumer.Model.BasicPublish( exchange: string.Empty, routingKey: args.BasicProperties?.ReplyTo ?? string.Empty, - basicProperties: _propertiesProvider.GetProperties(p => + basicProperties: _propertiesProvider.GetProperties(p => { p.CorrelationId = args.BasicProperties?.CorrelationId ?? string.Empty; p.Headers.Add(PropertyHeaders.ExceptionHeader, _messageExceptionName); @@ -82,7 +82,7 @@ public virtual Task OnResponseRecievedAsync(BasicDeliverEventArgs args, TaskComp if (containsException) { _logger.LogInformation($"Message '{args.BasicProperties.MessageId}' withh CorrelationId '{args.BasicProperties.CorrelationId}' contains exception. Deserialize and re-throw."); - var exceptionInfo = _serializer.Deserialize(args.Body); + var exceptionInfo = _serializer.Deserialize(args.Body); var exception = new MessageHandlerException(exceptionInfo.Message) { InnerExceptionType = exceptionInfo.ExceptionType, diff --git a/src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs b/src/RawRabbit/Exceptions/ExceptionInformation.cs similarity index 86% rename from src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs rename to src/RawRabbit/Exceptions/ExceptionInformation.cs index 31409f6c..6eb66f9b 100644 --- a/src/RawRabbit/Exceptions/MessageHandlerExceptionInformation.cs +++ b/src/RawRabbit/Exceptions/ExceptionInformation.cs @@ -3,7 +3,7 @@ /// /// Holds information about exception thrown in a remote message handler. /// - public class MessageHandlerExceptionInformation + public class ExceptionInformation { public string Message { get; set; } public string ExceptionType { get; set; } From 4794dbb873af476ac509ab36c8a9337841c96e55 Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Mon, 19 Dec 2016 07:50:23 +0100 Subject: [PATCH 19/21] (#150) Use ExceptionInfo instead of Exception --- src/RawRabbit/ErrorHandling/DefaultStrategy.cs | 9 ++++++++- .../ErrorHandling/MessageHandlerExceptionMessage.cs | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs index 829dfa62..5c5cef6e 100644 --- a/src/RawRabbit/ErrorHandling/DefaultStrategy.cs +++ b/src/RawRabbit/ErrorHandling/DefaultStrategy.cs @@ -133,9 +133,16 @@ public virtual async Task OnSubscriberExceptionAsync(IRawConsumer consumer, Subs await _topologyProvider.DeclareExchangeAsync(_errorExchangeCfg); var channel = await _channelFactory.GetChannelAsync(); var msg = _serializer.Deserialize(args); + var actualException = UnwrapInnerException(exception); var errorMsg = new HandlerExceptionMessage { - Exception = exception, + Exception = new ExceptionInformation + { + ExceptionType = actualException.GetType().FullName, + InnerMessage = actualException.InnerException?.Message, + Message = actualException.Message, + StackTrace = actualException.StackTrace + }, Time = DateTime.Now, Host = Environment.MachineName, Message = msg, diff --git a/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs b/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs index 65ffc291..bb0fefc3 100644 --- a/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs +++ b/src/RawRabbit/ErrorHandling/MessageHandlerExceptionMessage.cs @@ -1,4 +1,5 @@ using System; +using RawRabbit.Exceptions; namespace RawRabbit.ErrorHandling { @@ -7,6 +8,6 @@ public class HandlerExceptionMessage public string Host { get; set; } public DateTime Time { get; set; } public object Message { get; set; } - public Exception Exception { get; set; } + public ExceptionInformation Exception { get; set; } } } From 52573f41643cd76eaf53c581bd981ab48f961480 Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Mon, 19 Dec 2016 08:01:14 +0100 Subject: [PATCH 20/21] Bump version (1.10.3) --- src/RawRabbit.Attributes/project.json | 2 +- src/RawRabbit.DependencyInjection.Autofac/project.json | 2 +- src/RawRabbit.DependencyInjection.Ninject/project.json | 2 +- src/RawRabbit.Extensions/project.json | 2 +- src/RawRabbit.Logging.Log4Net/project.json | 2 +- src/RawRabbit.Logging.NLog/project.json | 2 +- src/RawRabbit.Logging.Serilog/project.json | 2 +- src/RawRabbit.vNext/project.json | 2 +- src/RawRabbit/project.json | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/RawRabbit.Attributes/project.json b/src/RawRabbit.Attributes/project.json index 1eff72c9..36c1ac46 100644 --- a/src/RawRabbit.Attributes/project.json +++ b/src/RawRabbit.Attributes/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Attributes", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman" ], "description": "Configure messages and topology with attributes.", "packOptions": { diff --git a/src/RawRabbit.DependencyInjection.Autofac/project.json b/src/RawRabbit.DependencyInjection.Autofac/project.json index 3c9a7c2a..1983357e 100644 --- a/src/RawRabbit.DependencyInjection.Autofac/project.json +++ b/src/RawRabbit.DependencyInjection.Autofac/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.DependencyInjection.Autofac", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Wire up RawRabbit with Autfac!", diff --git a/src/RawRabbit.DependencyInjection.Ninject/project.json b/src/RawRabbit.DependencyInjection.Ninject/project.json index 069c1932..a892c4b3 100644 --- a/src/RawRabbit.DependencyInjection.Ninject/project.json +++ b/src/RawRabbit.DependencyInjection.Ninject/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.DependencyInjection.Ninject", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman", "Joshua Barron" ], "description": "Wire up RawRabbit with Ninject!", diff --git a/src/RawRabbit.Extensions/project.json b/src/RawRabbit.Extensions/project.json index 37f70b3f..20637d5b 100644 --- a/src/RawRabbit.Extensions/project.json +++ b/src/RawRabbit.Extensions/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Extensions", - "version": "1.10.2-*", + "version": "1.10.3-*", "description": "Make the most of RawRabbit with these extensions", "authors": [ "par.dahlman" ], "packOptions": { diff --git a/src/RawRabbit.Logging.Log4Net/project.json b/src/RawRabbit.Logging.Log4Net/project.json index 169073d7..fcff4fe5 100644 --- a/src/RawRabbit.Logging.Log4Net/project.json +++ b/src/RawRabbit.Logging.Log4Net/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.Log4Net", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with Log4Net", diff --git a/src/RawRabbit.Logging.NLog/project.json b/src/RawRabbit.Logging.NLog/project.json index de9d5d74..a8022ce0 100644 --- a/src/RawRabbit.Logging.NLog/project.json +++ b/src/RawRabbit.Logging.NLog/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.NLog", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with NLog", diff --git a/src/RawRabbit.Logging.Serilog/project.json b/src/RawRabbit.Logging.Serilog/project.json index a55a715a..ca787614 100644 --- a/src/RawRabbit.Logging.Serilog/project.json +++ b/src/RawRabbit.Logging.Serilog/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit.Logging.Serilog", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "par.dahlman" ], "description": "Write RawRabbit's logs with Serilog", diff --git a/src/RawRabbit.vNext/project.json b/src/RawRabbit.vNext/project.json index 7bd1857c..a20dbc8d 100644 --- a/src/RawRabbit.vNext/project.json +++ b/src/RawRabbit.vNext/project.json @@ -1,5 +1,5 @@ { - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman", "enrique-avalon" ], "description": "Take advantage of vNext for your RawRabbit setup. Extension for IServiceCollection, IConfiguration support and much more!", "packOptions": { diff --git a/src/RawRabbit/project.json b/src/RawRabbit/project.json index 12290480..75b2737f 100644 --- a/src/RawRabbit/project.json +++ b/src/RawRabbit/project.json @@ -1,6 +1,6 @@ { "title": "RawRabbit", - "version": "1.10.2-*", + "version": "1.10.3-*", "authors": [ "pardahlman", "enrique-avalon" ], "description": "A modern framework for communication over RabbitMq.", "packOptions": { From eccf73df5add77dfb13445fc4b9fb15670c4edbf Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Mon, 19 Dec 2016 08:13:40 +0100 Subject: [PATCH 21/21] Update Release Notes --- RELEASENOTES.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/RELEASENOTES.md b/RELEASENOTES.md index 5fef755d..3ded76c3 100644 --- a/RELEASENOTES.md +++ b/RELEASENOTES.md @@ -1,3 +1,22 @@ +# 1.10.3 + + - [#163](https://github.com/pardahlman/RawRabbit/pull/163) - Added queue assume initialized and tests contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#162](https://github.com/pardahlman/RawRabbit/pull/162) - Converted tabs to 4 spaces contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#161](https://github.com/pardahlman/RawRabbit/pull/161) - added editorconfig for uniform editing contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#160](https://github.com/pardahlman/RawRabbit/issues/160) - Add"AssumeInitialized" functionality for queues + - [#159](https://github.com/pardahlman/RawRabbit/pull/159) - Added default broker connection values for RawRabbitConfig class contributed by Cemre Mengu ([cemremengu](https://github.com/cemremengu)) + - [#150](https://github.com/pardahlman/RawRabbit/issues/150) - StackOverflowException occures when subscribeMethod throws an exception using dotnet core + - [#143](https://github.com/pardahlman/RawRabbit/issues/143) - Failure Recovery Issue with PublishAsync + - [#142](https://github.com/pardahlman/RawRabbit/issues/142) - Failure Recovery + - [#140](https://github.com/pardahlman/RawRabbit/pull/140) - (#129) Expose Mandatory Option For Publish contributed by Richard Tasker ([ritasker](https://github.com/ritasker)) + - [#136](https://github.com/pardahlman/RawRabbit/issues/136) - Newtonsoft.Json.JsonSerializationException: Error getting value from 'ScopeId' on 'System.Net.IPAddress'. + - [#132](https://github.com/pardahlman/RawRabbit/issues/132) - Default connection timeout + - [#129](https://github.com/pardahlman/RawRabbit/issues/129) - Expose mandatory option for publish + - [#116](https://github.com/pardahlman/RawRabbit/issues/116) - Unable to publish message to default error exchange. + +Commits: 4b6e57e351...52573f4164 + + # 1.10.1 - [#108](https://github.com/pardahlman/RawRabbit/issues/108) - Upgrade to RabbitMQ.Client 4.1.0