To further support the transactional outbox pattern the events need to be dequeued from the database and sent to the final messaging subsystem via a message relay process.
This is achieved by dequeuing a set of events from the database, again within the context of a database transaction, and forwarding the events/messages to the messaging subsystem. On successful completion, the database transaction can be committed; otherwise, on error the dequeue should be rolled back, and the forwarding re-attempted.
This message relay process will result in at least once publishing semantics; i.e. where there was an error and retry the same events/messages may be sent again. It is the responsibility of the end subscriber to handle multiple events/messages; being the requirement for duplicate checking. The EventData.Id
by default is unique and should be used for this purpose.
To achieve in-order publishing the message relay process should execute as a singleton; i.e. only a single (synchronized) process can execute to guarantee in-order sequencing. Within an event-driven architecture the order in which the events/messages are generated is critical, and as such this order must be maintained (at least from a publishing perspective).
The dequeuing and forwarding should occur in a separate process to that in which they are generated; the success or failure of the message relay should not impact the originating operation.
CoreEx provides the EventOutboxHostedService
that enables the message relay processing; in that it hosts the previously generated EventOutboxDequeue
and will execute (DequeueAndSendAsync
) on a configured interval.
The EventOutboxHostedService
inherits from the SynchronizedTimerHostedServiceBase
which requires an IServiceSynchronizer
that is responsible for enabling the synchronized singleton behaviour.
The following synchronizers are provided.
Class | Description |
---|---|
ConcurrentSynchronizer |
Enables concurrency; i.e. there is no synchronization. |
FileLockSynchronizer |
Performs synchronization by taking an exclusive lock on a file (Windows or Linux). |
BlobLeaseSynchronizer |
Performs synchronization by acquiring a lease on an Azure Storage Blob. |
The EventOutboxDequeue
and EventOutboxHostedService
are host agnostic, in that they can be hosted by any of the following.
The CoreEx ServiceBusSender
enables the sending (IEventSender
) of events/messages to Azure Service Bus. As an IEventSender
can send none or more events/messages these will be sent leveraging batches (improves performance and resiliency) as per the Microsoft guidance.
For the purposes of the MyEf.Hr
solution the Service Bus Publishing will be hosted within the MyEf.Hr.Api
host process; and it is assumed it will be deployed to Azure and as such the BlobLeaseSynchronizer
will be leveraged.
The Service Bus Publishing requires the following services registered.
Service | Description |
---|---|
Az.ServiceBusClient |
The AddSingleton() is used to register the Az.ServiceBusClient as a singleton service, leveraging the ServiceBusConnectionString configuration setting. |
IServiceSynchronizer |
The AddSingleton() is used to register the BlobLeaseSynchronizer as a singleton service. An Azure BlobContainerClient instance is required leveraging the StorageConnectionString configuration setting and specifying the storage container name. |
IServiceBusSender |
The AddScoped() is used to register the ServiceBusSender as a scoped service. |
IHostedService |
The AddSqlServerEventOutboxHostedService() registers the EventOutboxHostedService as an IHostedService that runs in the background for the life of the ASP.NET host. During registeration a new EventOutboxDequeue instantiation is defined to set the underlying EventOutboxDequeueFactory property; as a new internally managed scoped instance is required per invocation). |
To introduce, append the following within the API Startup
class after the earlier event service registration.
// Add transactional event outbox dequeue dependencies.
services.AddSingleton(sp => new Az.ServiceBusClient(sp.GetRequiredService<HrSettings>().ServiceBusConnectionString));
services.AddSingleton<IServiceSynchronizer>(sp => new BlobLeaseSynchronizer(new Azure.Storage.Blobs.BlobContainerClient(sp.GetRequiredService<HrSettings>().StorageConnectionString, "event-synchronizer")));
services.AddScoped<IServiceBusSender, ServiceBusSender>();
// Add transactional event outbox dequeue hosted service (_must_ be explicit with the IServiceBusSender as he IEventSender).
services.AddSqlServerEventOutboxHostedService(sp =>
{
return new EventOutboxDequeue(sp.GetRequiredService<IDatabase>(), sp.GetRequiredService<IServiceBusSender>(), sp.GetRequiredService<ILogger<EventOutboxDequeue>>());
});
The latest CoreEx.Azure
NuGet package will need to be added to the MyEf.Hr.Api
project. Additionally, Startup
will require the following using
statements added.
using CoreEx.Azure.ServiceBus;
using CoreEx.Azure.Storage;
using CoreEx.Database;
using CoreEx.Hosting;
using Az = Azure.Messaging.ServiceBus;
The HrSettings
class should have the following properties added.
/// <summary>
/// Gets the Azure service bus connection string.
/// </summary>
public string ServiceBusConnectionString => GetRequiredValue<string>("ConnectionStrings__ServiceBus");
/// <summary>
/// Gets the Azure storage connection string.
/// </summary>
public string StorageConnectionString => GetRequiredValue<string>("ConnectionStrings__Storage");
The corresponding appsettings.json
also needs to be updated to provide the requisite configuration; replace the existing JSON with the following. The '*' within denotes that the configuration settings are accessed internally by CoreEx at runtime and therefore do not need to be specifically defined as HrSettings
properties.
Setting | Description |
---|---|
ConnectionStrings:ServiceBus |
The Azure Service Bus connection string. |
ConnectionStrings:Storage |
The Azure Storage connection string. |
ServiceBusSender:QueueOrTopicName * |
The Azure Service Bus Queue or Topic name depending on requirements. |
EventOutboxHostedService:MaxDequeueSize * |
The maximum number of events to dequeue and send per batch. |
EventOutboxHostedService:Interval * |
The interval (TimeSpan) to poll the queue and send; set to poll every 10 seconds. |
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"ConnectionStrings": {
"Database": "Data Source=.;Initial Catalog=MyEf.Hr;Integrated Security=True;TrustServerCertificate=true",
"ServiceBus": "add-top-secret-connection-string",
"Storage": "add-top-secret-connection-string"
},
"ServiceBusSender": {
"QueueOrTopicName": "event-stream"
},
"EventOutboxHostedService": {
"MaxDequeueSize": "10",
"Interval": "00:00:10"
}
}
There are no specific provisions for the unit testing of the Service Bus Publishing as it requires a dependent messaging subsystem, being Azure Service Bus.
However, to minimize any impact to the other existing unit tests the EventOutboxHostedService
should be disabled. To disable, add a new appsettings.unittest.json
file to MyEf.Hr.Test
project with the following contents. Go to the file properties and set Copy to Output Directory to Copy if newer.
{
"EventOutboxHostedService": {
"Enabled": false
}
}
To achieve a basic test within a developers machine then the API host should be started. By navigating to the Swagger page this will result in the ASP.NET host starting, which in turn will execute the registered IHostedService
. After a small delay the EventOutboxHostedService
will begin processing and the following should occur.
- Database dequeue - the entries previously enqueued within the
Outbox.EventOutbox
will have theDequeueDate
column updated signalling that the event was dequeued and sent successfully. - Service Bus Message - the corresponding events/messages should appear within the Azure Service Bus (queue or topic).
At this stage we now have our events being published to Azure Service Bus. This essentially concludes the Hr
domain functionality, with respect to enabling the requisite APIs and publishing of corresponding events (where applicable).
Next we will create a new Security domain that will perform a Service Bus Subscribe of the Termination related events and proxy Okta (as the fictitious company's identity solution) automatically Deactivating the Employee's account.