diff --git a/src/RawRabbit/Pipe/Extensions/BasicConsumeExtension.cs b/src/RawRabbit/Pipe/Extensions/BasicConsumeExtension.cs index 40c58442..ae2b4fc6 100644 --- a/src/RawRabbit/Pipe/Extensions/BasicConsumeExtension.cs +++ b/src/RawRabbit/Pipe/Extensions/BasicConsumeExtension.cs @@ -11,7 +11,7 @@ namespace RawRabbit.Pipe.Extensions public static class BasicConsumeExtension { public static Task BasicConsumeAsync(this IBusClient busClient, Func> consumeFunc, - Action cfg) + Action context) { Func genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs); @@ -23,7 +23,7 @@ public static Task BasicConsumeAsync(this IBusClient busClient, Func() .Use(new QueueBindOptions { - ExchangeNameFunc = context => context.GetConsumeConfiguration()?.ExchangeName + ExchangeNameFunc = ctx => ctx.GetConsumeConfiguration()?.ExchangeName }) .Use() .Use(new ConsumeOptions @@ -31,14 +31,14 @@ public static Task BasicConsumeAsync(this IBusClient busClient, Func p .Use(new HandlerInvokationOptions { - HandlerArgsFunc = context => new object[] {context.GetDeliveryEventArgs()}, + HandlerArgsFunc = ctx => new object[] { ctx.GetDeliveryEventArgs()}, }) .Use() }), - context => + ctx => { - context.Properties.Add(PipeKey.MessageHandler, genericFunc); - context.Properties.Add(PipeKey.ConfigurationAction, cfg); + ctx.Properties.Add(PipeKey.MessageHandler, genericFunc); + context?.Invoke(ctx); } ); } diff --git a/test/RawRabbit.IntegrationTests/Features/GlobalExecutionIdTests.cs b/test/RawRabbit.IntegrationTests/Features/GlobalExecutionIdTests.cs index 33ee2c83..f49bba63 100644 --- a/test/RawRabbit.IntegrationTests/Features/GlobalExecutionIdTests.cs +++ b/test/RawRabbit.IntegrationTests/Features/GlobalExecutionIdTests.cs @@ -5,6 +5,7 @@ using RabbitMQ.Client.Events; using RawRabbit.Common; using RawRabbit.IntegrationTests.TestMessages; +using RawRabbit.Pipe; using RawRabbit.Pipe.Extensions; using Xunit; @@ -32,17 +33,19 @@ public async Task Should_Forward_On_Pub_Sub() await secondSubscriber.SubscribeAsync(message => secondSubscriber.PublishAsync(new ThirdMessage())); await thridSubscriber.SubscribeAsync(message => Task.FromResult(0)); await consumer.BasicConsumeAsync(args => - { - var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted); - tsc.TrySetResult(args); - return Task.FromResult(new Ack()); - }, cfg => cfg - .Consume(c => c - .OnExchange("rawrabbit.integrationtests.testmessages") - .WithRoutingKey("#")) - .FromDeclaredQueue(q => q - .WithName("take_all") - .WithAutoDelete()) + { + var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted); + tsc.TrySetResult(args); + return Task.FromResult(new Ack()); + }, ctx => ctx + .UseConsumerConfiguration(cfg => cfg + .Consume(c => c + .OnExchange("rawrabbit.integrationtests.testmessages") + .WithRoutingKey("#")) + .FromDeclaredQueue(q => q + .WithName("take_all") + .WithAutoDelete()) + ) ); /* Test */ @@ -91,14 +94,15 @@ await consumer.BasicConsumeAsync(args => var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted); tsc.TrySetResult(args); return Task.FromResult(new Ack()); - }, cfg => cfg - .Consume(c => c - .OnExchange("rawrabbit.integrationtests.testmessages") - .WithRoutingKey("#")) - .FromDeclaredQueue(q => q - .WithName("take_all") - .WithAutoDelete()) - ); + }, ctx => ctx + .UseConsumerConfiguration(cfg => cfg + .Consume(c => c + .OnExchange("rawrabbit.integrationtests.testmessages") + .WithRoutingKey("#")) + .FromDeclaredQueue(q => q + .WithName("take_all") + .WithAutoDelete()) + )); /* Test */ await requester.RequestAsync();