diff --git a/src/Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection/CqrsInjector.cs b/src/Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection/CqrsInjector.cs index 5b0272a..15da06e 100644 --- a/src/Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection/CqrsInjector.cs +++ b/src/Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection/CqrsInjector.cs @@ -111,9 +111,9 @@ public CqrsInjector AddRemoteQueryCache(Action /// Use default implementation of that accesses file system directly. /// /// - public CqrsInjector UseDefaultFileProvider() + public CqrsInjector AddDefaultFileProvider() { - return UseFileProvider(); + return AddFileProvider(); } /// @@ -121,7 +121,7 @@ public CqrsInjector UseDefaultFileProvider() /// /// The implementation type. /// - public CqrsInjector UseFileProvider() + public CqrsInjector AddFileProvider() where TProvider : class, IFileProvider { Services.AddScoped(); diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/BufferedIntegrationEvent.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/BufferedIntegrationEvent.cs new file mode 100644 index 0000000..47c56b7 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/BufferedIntegrationEvent.cs @@ -0,0 +1,8 @@ +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// The integration event stored in buffer. +/// +/// The event name. +/// The event data. +public record BufferedIntegrationEvent(string Name, IntegrationEvent Event); diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/Cnblogs.Architecture.Ddd.EventBus.Abstractions.csproj b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/Cnblogs.Architecture.Ddd.EventBus.Abstractions.csproj index cf45e84..8f83a44 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/Cnblogs.Architecture.Ddd.EventBus.Abstractions.csproj +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/Cnblogs.Architecture.Ddd.EventBus.Abstractions.csproj @@ -6,6 +6,6 @@ - + diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBus.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/DefaultEventBus.cs similarity index 51% rename from src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBus.cs rename to src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/DefaultEventBus.cs index a107de6..f9277bc 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBus.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/DefaultEventBus.cs @@ -1,61 +1,44 @@ -using Cnblogs.Architecture.Ddd.EventBus.Abstractions; -using Dapr.Client; using MediatR; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -namespace Cnblogs.Architecture.Ddd.EventBus.Dapr; +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; /// -/// Dapr EventBus 实现。 +/// Default implementation for /// -public class DaprEventBus : IEventBus +public class DefaultEventBus : IEventBus { - private readonly DaprClient _daprClient; - private readonly DaprOptions _daprOptions; + private readonly IEventBuffer _eventBuffer; private readonly IMediator _mediator; - private readonly ILogger _logger; + private readonly ILogger _logger; /// - /// 创建一个 DaprEventBus + /// Create a instance. /// - /// - /// - /// 日志记录器。 - /// - public DaprEventBus( - IOptions daprOptions, - DaprClient daprClient, - IMediator mediator, - ILogger logger) + /// The underlying event buffer. + /// The IMediator. + /// The logger. + public DefaultEventBus(IEventBuffer eventBuffer, IMediator mediator, ILogger logger) { - _daprClient = daprClient; + _eventBuffer = eventBuffer; _logger = logger; _mediator = mediator; - _daprOptions = daprOptions.Value; } /// - public async Task PublishAsync(TEvent @event) + public Task PublishAsync(TEvent @event) where TEvent : IntegrationEvent { - await PublishAsync(typeof(TEvent).Name, @event); + return PublishAsync(typeof(TEvent).Name, @event); } /// - public async Task PublishAsync(string eventName, TEvent @event) + public Task PublishAsync(string eventName, TEvent @event) where TEvent : IntegrationEvent { - _logger.LogInformation( - "Publishing IntegrationEvent, Name: {EventName}, Body: {Event}, TraceId: {TraceId}", - eventName, - @event, - @event.TraceId ?? @event.Id); @event.TraceId = TraceId; - await _daprClient.PublishEventAsync( - DaprOptions.PubSubName, - DaprUtils.GetDaprTopicName(_daprOptions.AppName, eventName), - @event); + _eventBuffer.Add(eventName, @event); + return Task.CompletedTask; } /// @@ -97,4 +80,4 @@ public Task ReceiveAsync(TEvent receivedEvent) /// public Guid? TraceId { get; set; } -} \ No newline at end of file +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs new file mode 100644 index 0000000..82b5289 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs @@ -0,0 +1,19 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Options for event bus. +/// +public class EventBusOptions +{ + /// + /// The service collection for + /// + public IServiceCollection? Services { get; set; } + + /// + /// Interval for publish integration event. + /// + public int Interval { get; set; } = 1; +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptionsBuilder.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptionsBuilder.cs new file mode 100644 index 0000000..1c8d549 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptionsBuilder.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Builder for . +/// +public class EventBusOptionsBuilder +{ + /// + /// Create a . + /// + /// + public EventBusOptionsBuilder(IServiceCollection services) + { + Services = services; + } + + /// + /// Internal service collection. + /// + public IServiceCollection Services { get; } + + /// + /// The interval in milliseconds for checking pending integration events. + /// + public int Interval { get; set; } = 1000; + + internal Action GetConfiguration() + { + return o => + { + o.Interval = Interval; + }; + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusServiceInjector.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusServiceInjector.cs new file mode 100644 index 0000000..357cbf0 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusServiceInjector.cs @@ -0,0 +1,54 @@ +using System.Reflection; +using Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Extension methods for injecting to service collection. +/// +public static class EventBusServiceInjector +{ + /// + /// Add event bus for integration event support. + /// + /// The services. + /// Extra configurations for event bus. + /// The assemblies for handlers. + /// . + public static IServiceCollection AddEventBus( + this IServiceCollection services, + Action? configuration = null, + params Assembly[] handlerAssemblies) + { + services.TryAddSingleton(); + services.TryAddScoped(); + services.AddHostedService(); + var builder = new EventBusOptionsBuilder(services); + configuration?.Invoke(builder); + services.Configure(builder.GetConfiguration()); + if (handlerAssemblies.Length > 0) + { + services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(handlerAssemblies)); + } + + return services; + } + + /// + /// Add event bus for integration event support. + /// + /// The . + /// The configuration. + /// The assemblies for handlers. + /// + public static CqrsInjector AddEventBus( + this CqrsInjector cqrsInjector, + Action? configuration = null, + params Assembly[] handlerAssemblies) + { + cqrsInjector.Services.AddEventBus(configuration, handlerAssemblies); + return cqrsInjector; + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBuffer.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBuffer.cs new file mode 100644 index 0000000..17970e1 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBuffer.cs @@ -0,0 +1,33 @@ +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Buffer for integration events. +/// +public interface IEventBuffer +{ + /// + /// Number of pending events. + /// + int Count { get; } + + /// + /// Add an event to buffer. + /// + /// The name of integration event. + /// The event. + /// The type of integration event. + void Add(string name, TEvent @event) + where TEvent : IntegrationEvent; + + /// + /// Get an integration event without removing it. + /// + /// The integration event, null will be returned if buffer is empty. + BufferedIntegrationEvent? Peek(); + + /// + /// Get an integration event and remove it. + /// + /// The integration event, null will be returned if buffer is empty. + BufferedIntegrationEvent? Pop(); +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusProvider.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusProvider.cs new file mode 100644 index 0000000..527c19e --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusProvider.cs @@ -0,0 +1,15 @@ +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Provider contract for event bus. +/// +public interface IEventBusProvider +{ + /// + /// Emit an integration event. + /// + /// The name of the event. + /// The event body. + /// + Task PublishAsync(string eventName, IntegrationEvent @event); +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusHandler.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusRequestHandler.cs similarity index 77% rename from src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusHandler.cs rename to src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusRequestHandler.cs index 15a10f0..e5dc196 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusHandler.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusRequestHandler.cs @@ -3,6 +3,6 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; /// /// The empty interface as a generic type constraint /// -public interface IEventBusHandler +public interface IEventBusRequestHandler { } diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IIntegrationEventHandler.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IIntegrationEventHandler.cs index f533225..3968f00 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IIntegrationEventHandler.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IIntegrationEventHandler.cs @@ -6,7 +6,7 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; /// 集成事件处理器。 /// /// 集成事件。 -public interface IIntegrationEventHandler : INotificationHandler, IEventBusHandler +public interface IIntegrationEventHandler : INotificationHandler, IEventBusRequestHandler where TEvent : IntegrationEvent { } diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/InMemoryEventBuffer.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/InMemoryEventBuffer.cs new file mode 100644 index 0000000..82b8898 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/InMemoryEventBuffer.cs @@ -0,0 +1,33 @@ +using System.Collections.Concurrent; + +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// Implementation of using . +/// +public class InMemoryEventBuffer : IEventBuffer +{ + private readonly ConcurrentQueue _queue = new(); + + /// + public int Count => _queue.Count; + + /// + public void Add(string name, TEvent @event) + where TEvent : IntegrationEvent + { + _queue.Enqueue(new BufferedIntegrationEvent(name, @event)); + } + + /// + public BufferedIntegrationEvent? Peek() + { + return _queue.TryPeek(out var @event) ? @event : null; + } + + /// + public BufferedIntegrationEvent? Pop() + { + return _queue.TryDequeue(out var @event) ? @event : null; + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs new file mode 100644 index 0000000..041c718 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs @@ -0,0 +1,87 @@ +using System.Diagnostics; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; + +/// +/// The hosted service for publishing integration event at background. +/// +public sealed class PublishIntegrationEventHostedService : BackgroundService +{ + private readonly EventBusOptions _options; + private readonly IServiceProvider _serviceProvider; + private readonly IEventBuffer _eventBuffer; + private readonly ILogger _logger; + + /// + /// Create a . + /// + /// The event bus options. + /// The service provider. + /// The logger. + /// The buffer for integration events. + public PublishIntegrationEventHostedService( + IOptions options, + IServiceProvider serviceProvider, + ILogger logger, + IEventBuffer eventBuffer) + { + _options = options.Value; + _serviceProvider = serviceProvider; + _logger = logger; + _eventBuffer = eventBuffer; + } + + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Integration event publisher running."); + var watch = new Stopwatch(); + using var timer = new PeriodicTimer(TimeSpan.FromMicroseconds(_options.Interval)); + while (await timer.WaitForNextTickAsync(stoppingToken)) + { + try + { + watch.Restart(); + var beforeCount = _eventBuffer.Count; + await PublishEventAsync(); + watch.Stop(); + var afterCount = _eventBuffer.Count; + _logger.LogInformation( + "Published {PublishedEventCount} events in {Duration} ms, resting count: {RestingEventCount}", + beforeCount - afterCount, + watch.ElapsedMilliseconds, + afterCount); + } + catch (Exception e) + { + _logger.LogError(e, "Publish integration event failed, pending count: {Count}", _eventBuffer.Count); + } + } + } + + private async Task PublishEventAsync() + { + if (_eventBuffer.Count == 0) + { + return; + } + + using var scope = _serviceProvider.CreateScope(); + var provider = scope.ServiceProvider.GetRequiredService(); + while (_eventBuffer.Count > 0) + { + var buffered = _eventBuffer.Peek(); + if (buffered is null) + { + return; + } + + await provider.PublishAsync(buffered.Name, buffered.Event); + _eventBuffer.Pop(); + } + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/Cnblogs.Architecture.Ddd.EventBus.Dapr.csproj b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/Cnblogs.Architecture.Ddd.EventBus.Dapr.csproj index 8fa239b..d79a384 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/Cnblogs.Architecture.Ddd.EventBus.Dapr.csproj +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/Cnblogs.Architecture.Ddd.EventBus.Dapr.csproj @@ -7,7 +7,6 @@ - diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/CqrsInjectorExtensions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/CqrsInjectorExtensions.cs index 63ce26e..c81873a 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/CqrsInjectorExtensions.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/CqrsInjectorExtensions.cs @@ -1,7 +1,6 @@ using System.Reflection; using Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection; using Cnblogs.Architecture.Ddd.EventBus.Abstractions; -using Microsoft.Extensions.DependencyInjection; namespace Cnblogs.Architecture.Ddd.EventBus.Dapr; @@ -16,16 +15,10 @@ public static class CqrsInjectorExtensions /// /// 集成事件所在的 Assembly。 /// + [Obsolete("Use builder.AddCqrs().AddEventBus(o => o.UseDapr(assembly)) instead.", true)] public static CqrsInjector AddDaprEventBus(this CqrsInjector cqrsInjector, Assembly integrationEventAssembly) { - var appName = integrationEventAssembly.GetCustomAttribute(); - if (appName is null) - { - throw new InvalidOperationException( - "No AssemblyAppNameAttribute was found, add attribute to Assembly or specify AppName with AddDaprEventBus(string appName)"); - } - - return cqrsInjector.AddDaprEventBus(appName.Name); + return cqrsInjector.AddEventBus(o => o.UseDapr(integrationEventAssembly)); } /// @@ -34,9 +27,10 @@ public static CqrsInjector AddDaprEventBus(this CqrsInjector cqrsInjector, Assem /// /// 发布事件时使用的 appName。 /// + [Obsolete("Use builder.AddCqrs().AddEventBus(o => o.UseDapr(appName)) instead.", true)] public static CqrsInjector AddDaprEventBus(this CqrsInjector cqrsInjector, string appName) { - cqrsInjector.Services.AddDaprEventBus(appName); + cqrsInjector.AddEventBus(o => o.UseDapr(appName)); return cqrsInjector; } -} \ No newline at end of file +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusInjector.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusInjector.cs new file mode 100644 index 0000000..cb5d981 --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusInjector.cs @@ -0,0 +1,44 @@ +using System.Reflection; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Microsoft.Extensions.DependencyInjection; + +namespace Cnblogs.Architecture.Ddd.EventBus.Dapr; + +/// +/// Injector methods for dapr event bus. +/// +public static class DaprEventBusInjector +{ + /// + /// Use dapr as event bus provider. + /// + /// The . + /// The assembly of integration events of current app. + /// + public static EventBusOptionsBuilder UseDapr(this EventBusOptionsBuilder builder, Assembly integrationEventAssembly) + { + var appName = integrationEventAssembly.GetCustomAttribute(); + if (appName is null) + { + throw new InvalidOperationException( + "No AssemblyAppNameAttribute was found, add attribute to Assembly or specify AppName with AddDaprEventBus(string appName)"); + } + + return builder.UseDapr(appName.Name); + } + + /// + /// Use dapr as event bus provider. + /// + /// The . + /// The name of current app. + /// + public static EventBusOptionsBuilder UseDapr(this EventBusOptionsBuilder builder, string appName) + { + var services = builder.Services; + services.Configure(o => o.AppName = appName); + services.AddControllers().AddDapr(); + services.AddScoped(); + return builder; + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusProvider.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusProvider.cs new file mode 100644 index 0000000..dbdc62c --- /dev/null +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusProvider.cs @@ -0,0 +1,46 @@ +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Dapr.Client; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Cnblogs.Architecture.Ddd.EventBus.Dapr; + +/// +/// Implementations for using Dapr. +/// +public class DaprEventBusProvider : IEventBusProvider +{ + private readonly DaprClient _daprClient; + private readonly DaprOptions _daprOptions; + private readonly ILogger _logger; + + /// + /// Create a . + /// + /// The underlying dapr client. + /// The options for dapr. + /// The logger. + public DaprEventBusProvider( + DaprClient daprClient, + IOptions daprOptions, + ILogger logger) + { + _daprClient = daprClient; + _daprOptions = daprOptions.Value; + _logger = logger; + } + + /// + public async Task PublishAsync(string eventName, IntegrationEvent @event) + { + _logger.LogInformation( + "Publishing IntegrationEvent, Name: {EventName}, Body: {Event}, TraceId: {TraceId}", + eventName, + @event, + @event.TraceId ?? @event.Id); + await _daprClient.PublishEventAsync( + DaprOptions.PubSubName, + DaprUtils.GetDaprTopicName(_daprOptions.AppName, eventName), + @event); + } +} diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusServiceCollectionExtensions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusServiceCollectionExtensions.cs index cb8f7fa..7c8f69f 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusServiceCollectionExtensions.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/DaprEventBusServiceCollectionExtensions.cs @@ -19,20 +19,13 @@ public static class DaprEventBusServiceCollectionExtensions /// The app name used when publishing integration events. /// Assemblies to scan by MediatR /// + [Obsolete("use services.AddEventBus(o => o.UseDapr(), assemblies) instead.", true)] public static IServiceCollection AddDaprEventBus( this IServiceCollection services, string appName, params Assembly[] assemblies) { - services.Configure(o => o.AppName = appName); - services.AddControllers().AddDapr(); - services.AddScoped(); - - if (assemblies.Length > 0) - { - services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(assemblies)); - } - + services.AddEventBus(o => o.UseDapr(appName), assemblies); return services; } } diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs index c8830fb..221788d 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs @@ -90,7 +90,7 @@ public static IEndpointRouteBuilder Subscribe(this IEndpointRouteBuilder builder /// The integration event handler that implements ]]> /// public static IEndpointRouteBuilder SubscribeByEventHandler(this IEndpointRouteBuilder builder) - where TEventHandler : IEventBusHandler + where TEventHandler : IEventBusRequestHandler { return builder.SubscribeByEventHandler(typeof(TEventHandler)); } diff --git a/src/Cnblogs.Architecture.Ddd.Infrastructure.FileProviders.AliyunOss/CqrsInjectorExtensions.cs b/src/Cnblogs.Architecture.Ddd.Infrastructure.FileProviders.AliyunOss/CqrsInjectorExtensions.cs index b03c7cb..ebad89e 100644 --- a/src/Cnblogs.Architecture.Ddd.Infrastructure.FileProviders.AliyunOss/CqrsInjectorExtensions.cs +++ b/src/Cnblogs.Architecture.Ddd.Infrastructure.FileProviders.AliyunOss/CqrsInjectorExtensions.cs @@ -24,6 +24,6 @@ public static CqrsInjector UseAliyunOssFileProvider( { injector.Services.AddOssClient(configuration, configurationSectionName); injector.Services.Configure(configuration.GetSection(configurationSectionName)); - return injector.UseFileProvider(); + return injector.AddFileProvider(); } } diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CommandHandlers.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CommandHandlers.cs index abcc139..1496920 100644 --- a/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CommandHandlers.cs +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CommandHandlers.cs @@ -1,5 +1,7 @@ using Cnblogs.Architecture.Ddd.Cqrs.Abstractions; using Cnblogs.Architecture.IntegrationTestProject.Application.Errors; +using Cnblogs.Architecture.IntegrationTestProject.Domain.Events; +using MediatR; namespace Cnblogs.Architecture.IntegrationTestProject.Application.Commands; @@ -7,27 +9,37 @@ public class CommandHandlers : ICommandHandler, ICommandHandler, ICommandHandler { + private readonly IMediator _mediator; + + public CommandHandlers(IMediator mediator) + { + _mediator = mediator; + } + /// - public Task> Handle(CreateCommand request, CancellationToken cancellationToken) + public async Task> Handle(CreateCommand request, CancellationToken cancellationToken) { - return Task.FromResult(request.NeedError - ? CommandResponse.Fail(TestError.Default) - : CommandResponse.Success()); + await _mediator.Publish(new StringCreatedDomainEvent(request.Data ?? string.Empty), cancellationToken); + return request.NeedError + ? CommandResponse.Fail(TestError.Default) + : CommandResponse.Success(); } /// public Task> Handle(UpdateCommand request, CancellationToken cancellationToken) { - return Task.FromResult(request.NeedExecutionError - ? CommandResponse.Fail(TestError.Default) - : CommandResponse.Success()); + return Task.FromResult( + request.NeedExecutionError + ? CommandResponse.Fail(TestError.Default) + : CommandResponse.Success()); } /// public Task> Handle(DeleteCommand request, CancellationToken cancellationToken) { - return Task.FromResult(request.NeedError - ? CommandResponse.Fail(TestError.Default) - : CommandResponse.Success()); + return Task.FromResult( + request.NeedError + ? CommandResponse.Fail(TestError.Default) + : CommandResponse.Success()); } -} \ No newline at end of file +} diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CreateCommand.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CreateCommand.cs index 9909c70..eeb78ef 100644 --- a/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CreateCommand.cs +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Application/Commands/CreateCommand.cs @@ -3,4 +3,4 @@ namespace Cnblogs.Architecture.IntegrationTestProject.Application.Commands; -public record CreateCommand(bool NeedError, bool ValidateOnly = false) : ICommand; \ No newline at end of file +public record CreateCommand(bool NeedError, string? Data = null, bool ValidateOnly = false) : ICommand; diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Application/EventHandlers/StringCreatedEventHandler.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Application/EventHandlers/StringCreatedEventHandler.cs new file mode 100644 index 0000000..57ce6cb --- /dev/null +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Application/EventHandlers/StringCreatedEventHandler.cs @@ -0,0 +1,22 @@ +using Cnblogs.Architecture.Ddd.Cqrs.Abstractions; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Cnblogs.Architecture.IntegrationTestProject.Domain.Events; +using Cnblogs.Architecture.TestIntegrationEvents; + +namespace Cnblogs.Architecture.IntegrationTestProject.Application.EventHandlers; + +public class StringCreatedEventHandler : IDomainEventHandler +{ + private readonly IEventBus _eventBus; + + public StringCreatedEventHandler(IEventBus eventBus) + { + _eventBus = eventBus; + } + + /// + public async Task Handle(StringCreatedDomainEvent notification, CancellationToken cancellationToken) + { + await _eventBus.PublishAsync(new TestIntegrationEvent(Guid.NewGuid(), DateTimeOffset.Now, notification.Data)); + } +} diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Cnblogs.Architecture.IntegrationTestProject.csproj b/test/Cnblogs.Architecture.IntegrationTestProject/Cnblogs.Architecture.IntegrationTestProject.csproj index 99d0d0c..1b9b076 100644 --- a/test/Cnblogs.Architecture.IntegrationTestProject/Cnblogs.Architecture.IntegrationTestProject.csproj +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Cnblogs.Architecture.IntegrationTestProject.csproj @@ -9,4 +9,7 @@ + + + diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Domain/Events/StringCreatedDomainEvent.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Domain/Events/StringCreatedDomainEvent.cs new file mode 100644 index 0000000..fa075ca --- /dev/null +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Domain/Events/StringCreatedDomainEvent.cs @@ -0,0 +1,5 @@ +using Cnblogs.Architecture.Ddd.Domain.Abstractions; + +namespace Cnblogs.Architecture.IntegrationTestProject.Domain.Events; + +public record StringCreatedDomainEvent(string Data) : DomainEvent; diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Payloads/CreatePayload.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Payloads/CreatePayload.cs index 6cabfc4..65b7eac 100644 --- a/test/Cnblogs.Architecture.IntegrationTestProject/Payloads/CreatePayload.cs +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Payloads/CreatePayload.cs @@ -1,3 +1,3 @@ namespace Cnblogs.Architecture.IntegrationTestProject.Payloads; -public record CreatePayload(bool NeedError); \ No newline at end of file +public record CreatePayload(bool NeedError, string? Data); diff --git a/test/Cnblogs.Architecture.IntegrationTestProject/Program.cs b/test/Cnblogs.Architecture.IntegrationTestProject/Program.cs index 8c7f2a8..9adca7b 100644 --- a/test/Cnblogs.Architecture.IntegrationTestProject/Program.cs +++ b/test/Cnblogs.Architecture.IntegrationTestProject/Program.cs @@ -1,5 +1,6 @@ using System.Reflection; using Cnblogs.Architecture.Ddd.Cqrs.AspNetCore; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; using Cnblogs.Architecture.Ddd.EventBus.Dapr; using Cnblogs.Architecture.IntegrationTestProject; using Cnblogs.Architecture.IntegrationTestProject.Application.Commands; @@ -11,7 +12,7 @@ builder.Services.AddCqrs(Assembly.GetExecutingAssembly(), typeof(TestIntegrationEvent).Assembly) .AddDefaultDateTimeAndRandomProvider() - .AddDaprEventBus(Constants.AppName); + .AddEventBus(o => o.UseDapr(Constants.AppName)); builder.Services.AddControllers().AddCqrsModelBinderProvider(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle @@ -38,7 +39,7 @@ v1.MapQuery("apps/{appId}/strings/{stringId:int}/value", MapNullableRouteParameter.Enable, enableHead: true); v1.MapQuery("strings/{id:int}"); v1.MapQuery("strings"); -v1.MapCommand("strings", (CreatePayload payload) => new CreateCommand(payload.NeedError)); +v1.MapCommand("strings", (CreatePayload payload) => new CreateCommand(payload.NeedError, payload.Data)); v1.MapCommand( "strings/{id:int}", (int id, UpdatePayload payload) => new UpdateCommand(id, payload.NeedValidationError, payload.NeedExecutionError)); diff --git a/test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs b/test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs index e480264..a017ad5 100644 --- a/test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs +++ b/test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs @@ -1,4 +1,6 @@ using System.Net; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Cnblogs.Architecture.Ddd.EventBus.Dapr; using Cnblogs.Architecture.IntegrationTestProject.EventHandlers; using Cnblogs.Architecture.TestIntegrationEvents; using FluentAssertions; @@ -19,10 +21,10 @@ public async Task Dapr_SubscribeEndpoint_OkAsync(SubscribeType subscribeType) { // Arrange var builder = WebApplication.CreateBuilder(); - builder.Services.AddDaprEventBus(nameof(DaprTests)); + builder.Services.AddEventBus(o => o.UseDapr(nameof(DaprTests))); builder.WebHost.UseTestServer(); - using var app = builder.Build(); + await using var app = builder.Build(); _ = subscribeType switch { @@ -51,7 +53,7 @@ public async Task Dapr_SubscribeWithoutAnyAssembly_OkAsync() { // Arrange var builder = WebApplication.CreateBuilder(); - builder.Services.AddDaprEventBus(nameof(DaprTests)); + builder.Services.AddEventBus(o => o.UseDapr(nameof(DaprTests))); builder.WebHost.UseTestServer(); var app = builder.Build(); diff --git a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventHandlerTests.cs b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventHandlerTests.cs index a1f6306..f078764 100644 --- a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventHandlerTests.cs +++ b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventHandlerTests.cs @@ -1,10 +1,11 @@ using System.Net.Http.Json; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Cnblogs.Architecture.Ddd.EventBus.Dapr; using Cnblogs.Architecture.IntegrationTestProject.EventHandlers; using Cnblogs.Architecture.TestIntegrationEvents; using FluentAssertions; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.TestHost; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Serilog; using Serilog.Sinks.InMemory; @@ -29,8 +30,9 @@ public async Task IntegrationEventHandler_TestIntegrationEvent_SuccessAsync() // Arrange var builder = WebApplication.CreateBuilder(); builder.Logging.AddSerilog(logger => logger.WriteTo.InMemory().WriteTo.Console()); - builder.Services - .AddDaprEventBus(nameof(IntegrationEventHandlerTests), typeof(TestIntegrationEventHandler).Assembly); + builder.Services.AddEventBus( + o => o.UseDapr(nameof(IntegrationEventHandlerTests)), + typeof(TestIntegrationEventHandler).Assembly); builder.WebHost.UseTestServer(); var app = builder.Build(); app.Subscribe(); diff --git a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs new file mode 100644 index 0000000..979f39a --- /dev/null +++ b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs @@ -0,0 +1,45 @@ +using System.Net.Http.Json; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Cnblogs.Architecture.IntegrationTestProject; +using Cnblogs.Architecture.IntegrationTestProject.Payloads; +using Cnblogs.Architecture.TestIntegrationEvents; +using FluentAssertions; +using Microsoft.AspNetCore.Mvc.Testing; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Moq; + +namespace Cnblogs.Architecture.IntegrationTests; + +public class IntegrationEventPublishTests +{ + [Fact] + public async Task EventBus_PublishEvent_SuccessAsync() + { + // Arrange + const string data = "hello"; + var builder = new WebApplicationFactory(); + var eventBusMock = new Mock(); + builder = builder.WithWebHostBuilder( + b => b.ConfigureServices( + services => + { + services.RemoveAll(); + services.AddScoped(_ => eventBusMock.Object); + })); + + // Act + var response = await builder.CreateClient().PostAsJsonAsync( + "/api/v1/strings", + new CreatePayload(false, data)); + var content = await response.Content.ReadAsStringAsync(); + await Task.Delay(1500); + + // Assert + response.Should().BeSuccessful(); + content.Should().BeNullOrEmpty(); + eventBusMock.Verify( + x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)), + Times.Once); + } +} diff --git a/test/Cnblogs.Architecture.TestShared/Cnblogs.Architecture.TestShared.csproj b/test/Cnblogs.Architecture.TestShared/Cnblogs.Architecture.TestShared.csproj index b1f332d..89dca54 100644 --- a/test/Cnblogs.Architecture.TestShared/Cnblogs.Architecture.TestShared.csproj +++ b/test/Cnblogs.Architecture.TestShared/Cnblogs.Architecture.TestShared.csproj @@ -1,6 +1,6 @@ - + \ No newline at end of file diff --git a/test/Cnblogs.Architecture.UnitTests/EventBus/AssemblyAttributeTests.cs b/test/Cnblogs.Architecture.UnitTests/EventBus/AssemblyAttributeTests.cs index 529f855..3ad6a62 100644 --- a/test/Cnblogs.Architecture.UnitTests/EventBus/AssemblyAttributeTests.cs +++ b/test/Cnblogs.Architecture.UnitTests/EventBus/AssemblyAttributeTests.cs @@ -1,7 +1,8 @@ -using Cnblogs.Architecture.TestIntegrationEvents; +using Cnblogs.Architecture.Ddd.EventBus.Abstractions; +using Cnblogs.Architecture.Ddd.EventBus.Dapr; +using Cnblogs.Architecture.TestIntegrationEvents; using FluentAssertions; using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.DependencyInjection; namespace Cnblogs.Architecture.UnitTests.EventBus; @@ -12,7 +13,7 @@ public void SubscribeByAssemblyMeta_Success() { // Arrange var builder = WebApplication.CreateBuilder(); - builder.Services.AddDaprEventBus(nameof(AssemblyAttributeTests)); + builder.Services.AddEventBus(o => o.UseDapr(nameof(AssemblyAttributeTests))); var app = builder.Build(); // Act @@ -35,4 +36,4 @@ public void SubscribeByAssemblyMeta_Throw() // Assert act.Should().Throw(); } -} \ No newline at end of file +}