Skip to content

Commit

Permalink
Merge pull request #133 from cnblogs/add-event-queue
Browse files Browse the repository at this point in the history
feat: add event buffer
  • Loading branch information
ikesnowy authored Aug 8, 2023
2 parents 54ec918 + 41bcb76 commit a26969c
Show file tree
Hide file tree
Showing 32 changed files with 526 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ public CqrsInjector AddRemoteQueryCache<TRemote>(Action<CacheableRequestOptions>
/// Use default implementation of <see cref="IFileProvider"/> that accesses file system directly.
/// </summary>
/// <returns></returns>
public CqrsInjector UseDefaultFileProvider()
public CqrsInjector AddDefaultFileProvider()
{
return UseFileProvider<DefaultFileProvider>();
return AddFileProvider<DefaultFileProvider>();
}

/// <summary>
/// Use given implementation of <see cref="IFileProvider"/>.
/// </summary>
/// <typeparam name="TProvider">The implementation type.</typeparam>
/// <returns></returns>
public CqrsInjector UseFileProvider<TProvider>()
public CqrsInjector AddFileProvider<TProvider>()
where TProvider : class, IFileProvider
{
Services.AddScoped<IFileProvider, TProvider>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// The integration event stored in buffer.
/// </summary>
/// <param name="Name">The event name.</param>
/// <param name="Event">The event data.</param>
public record BufferedIntegrationEvent(string Name, IntegrationEvent Event);
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
</Description>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MediatR" Version="12.1.1" />
<ProjectReference Include="..\Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection\Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,61 +1,44 @@
using Cnblogs.Architecture.Ddd.EventBus.Abstractions;
using Dapr.Client;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Cnblogs.Architecture.Ddd.EventBus.Dapr;
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Dapr EventBus 实现。
/// Default implementation for <see cref="IEventBus"/>
/// </summary>
public class DaprEventBus : IEventBus
public class DefaultEventBus : IEventBus
{
private readonly DaprClient _daprClient;
private readonly DaprOptions _daprOptions;
private readonly IEventBuffer _eventBuffer;
private readonly IMediator _mediator;
private readonly ILogger<DaprEventBus> _logger;
private readonly ILogger<DefaultEventBus> _logger;

/// <summary>
/// 创建一个 DaprEventBus
/// Create a <see cref="DefaultEventBus"/> instance.
/// </summary>
/// <param name="daprOptions"><see cref="DaprOptions"/></param>
/// <param name="daprClient"><see cref="DaprClient"/></param>
/// <param name="logger">日志记录器。</param>
/// <param name="mediator"><see cref="IMediator"/></param>
public DaprEventBus(
IOptions<DaprOptions> daprOptions,
DaprClient daprClient,
IMediator mediator,
ILogger<DaprEventBus> logger)
/// <param name="eventBuffer">The underlying event buffer.</param>
/// <param name="mediator">The IMediator.</param>
/// <param name="logger">The logger.</param>
public DefaultEventBus(IEventBuffer eventBuffer, IMediator mediator, ILogger<DefaultEventBus> logger)
{
_daprClient = daprClient;
_eventBuffer = eventBuffer;
_logger = logger;
_mediator = mediator;
_daprOptions = daprOptions.Value;
}

/// <inheritdoc />
public async Task PublishAsync<TEvent>(TEvent @event)
public Task PublishAsync<TEvent>(TEvent @event)
where TEvent : IntegrationEvent
{
await PublishAsync(typeof(TEvent).Name, @event);
return PublishAsync(typeof(TEvent).Name, @event);
}

/// <inheritdoc />
public async Task PublishAsync<TEvent>(string eventName, TEvent @event)
public Task PublishAsync<TEvent>(string eventName, TEvent @event)
where TEvent : IntegrationEvent
{
_logger.LogInformation(
"Publishing IntegrationEvent, Name: {EventName}, Body: {Event}, TraceId: {TraceId}",
eventName,
@event,
@event.TraceId ?? @event.Id);
@event.TraceId = TraceId;
await _daprClient.PublishEventAsync(
DaprOptions.PubSubName,
DaprUtils.GetDaprTopicName(_daprOptions.AppName, eventName),
@event);
_eventBuffer.Add(eventName, @event);
return Task.CompletedTask;
}

/// <inheritdoc />
Expand Down Expand Up @@ -97,4 +80,4 @@ public Task ReceiveAsync<TEvent>(TEvent receivedEvent)

/// <inheritdoc />
public Guid? TraceId { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.Extensions.DependencyInjection;

namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Options for event bus.
/// </summary>
public class EventBusOptions
{
/// <summary>
/// The service collection for
/// </summary>
public IServiceCollection? Services { get; set; }

/// <summary>
/// Interval for publish integration event.
/// </summary>
public int Interval { get; set; } = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Microsoft.Extensions.DependencyInjection;

namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Builder for <see cref="EventBusOptions"/>.
/// </summary>
public class EventBusOptionsBuilder
{
/// <summary>
/// Create a <see cref="EventBusOptionsBuilder"/>.
/// </summary>
/// <param name="services"></param>
public EventBusOptionsBuilder(IServiceCollection services)
{
Services = services;
}

/// <summary>
/// Internal service collection.
/// </summary>
public IServiceCollection Services { get; }

/// <summary>
/// The interval in milliseconds for checking pending integration events.
/// </summary>
public int Interval { get; set; } = 1000;

internal Action<EventBusOptions> GetConfiguration()
{
return o =>
{
o.Interval = Interval;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System.Reflection;
using Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Extension methods for injecting <see cref="IEventBus"/> to service collection.
/// </summary>
public static class EventBusServiceInjector
{
/// <summary>
/// Add event bus for integration event support.
/// </summary>
/// <param name="services">The services.</param>
/// <param name="configuration">Extra configurations for event bus.</param>
/// <param name="handlerAssemblies">The assemblies for handlers.</param>
/// <returns><see cref="IServiceCollection"/>.</returns>
public static IServiceCollection AddEventBus(
this IServiceCollection services,
Action<EventBusOptionsBuilder>? configuration = null,
params Assembly[] handlerAssemblies)
{
services.TryAddSingleton<IEventBuffer, InMemoryEventBuffer>();
services.TryAddScoped<IEventBus, DefaultEventBus>();
services.AddHostedService<PublishIntegrationEventHostedService>();
var builder = new EventBusOptionsBuilder(services);
configuration?.Invoke(builder);
services.Configure(builder.GetConfiguration());
if (handlerAssemblies.Length > 0)
{
services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(handlerAssemblies));
}

return services;
}

/// <summary>
/// Add event bus for integration event support.
/// </summary>
/// <param name="cqrsInjector">The <see cref="CqrsInjector"/>.</param>
/// <param name="configuration">The configuration.</param>
/// <param name="handlerAssemblies">The assemblies for handlers.</param>
/// <returns></returns>
public static CqrsInjector AddEventBus(
this CqrsInjector cqrsInjector,
Action<EventBusOptionsBuilder>? configuration = null,
params Assembly[] handlerAssemblies)
{
cqrsInjector.Services.AddEventBus(configuration, handlerAssemblies);
return cqrsInjector;
}
}
33 changes: 33 additions & 0 deletions src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Buffer for integration events.
/// </summary>
public interface IEventBuffer
{
/// <summary>
/// Number of pending events.
/// </summary>
int Count { get; }

/// <summary>
/// Add an event to buffer.
/// </summary>
/// <param name="name">The name of integration event.</param>
/// <param name="event">The event.</param>
/// <typeparam name="TEvent">The type of integration event.</typeparam>
void Add<TEvent>(string name, TEvent @event)
where TEvent : IntegrationEvent;

/// <summary>
/// Get an integration event without removing it.
/// </summary>
/// <returns>The integration event, <c>null</c> will be returned if buffer is empty.</returns>
BufferedIntegrationEvent? Peek();

/// <summary>
/// Get an integration event and remove it.
/// </summary>
/// <returns>The integration event, <c>null</c> will be returned if buffer is empty.</returns>
BufferedIntegrationEvent? Pop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Provider contract for event bus.
/// </summary>
public interface IEventBusProvider
{
/// <summary>
/// Emit an integration event.
/// </summary>
/// <param name="eventName">The name of the event.</param>
/// <param name="event">The event body.</param>
/// <returns></returns>
Task PublishAsync(string eventName, IntegrationEvent @event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
/// <summary>
/// The empty interface as a generic type constraint
/// </summary>
public interface IEventBusHandler
public interface IEventBusRequestHandler
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
/// 集成事件处理器。
/// </summary>
/// <typeparam name="TEvent">集成事件。</typeparam>
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>, IEventBusHandler
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>, IEventBusRequestHandler
where TEvent : IntegrationEvent
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Collections.Concurrent;

namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// Implementation of <see cref="IEventBuffer"/> using <see cref="ConcurrentQueue{T}"/>.
/// </summary>
public class InMemoryEventBuffer : IEventBuffer
{
private readonly ConcurrentQueue<BufferedIntegrationEvent> _queue = new();

/// <inheritdoc />
public int Count => _queue.Count;

/// <inheritdoc />
public void Add<TEvent>(string name, TEvent @event)
where TEvent : IntegrationEvent
{
_queue.Enqueue(new BufferedIntegrationEvent(name, @event));
}

/// <inheritdoc />
public BufferedIntegrationEvent? Peek()
{
return _queue.TryPeek(out var @event) ? @event : null;
}

/// <inheritdoc />
public BufferedIntegrationEvent? Pop()
{
return _queue.TryDequeue(out var @event) ? @event : null;
}
}
Loading

0 comments on commit a26969c

Please sign in to comment.