To support the transactional outbox pattern there is the need to have a backing event queue within the database where the event data is persisted transactionally. This will ensure that the events and corresponding data being manipulated are persisted as a single unit of work; before an attempt is made to publish (send) the events to an underlying messaging subsystem. This will guarantee that there will be zero event loss, and that the events will be stored in the sequence in which they are enqueued (honoring order).
To provide generic eventing/messaging interoperability the CoreEx EventData
(inherits from EventDataBase
) provides a messaging subsystem agnostic means to describe the generic characteristics of an event/message.
The pluggable nature of an IEventPublisher
enables the publishing via the Publish
method to internally queue the messages, and when ready perform a SendAsync
to send the one or more published events in an atomic-style operation. The IEventPublisher
is responsible for orchestrating the EventDataFormatter
, IEventSerializer
and IEventSender
to enable the publish and send. The EventPublisher
provides the default implementation.
Within Step 1 the transactional outbox capabilities, both database and .NET, were generated and included into the solution when performing the Event outbox.
There were two tables added to the database Outbox.EventOutbox
and Outbox.EventOutboxData
via the corresponding generated migration scripts; these tables provide the underlying transactional persistence.
The following are the key generated Outbox enqueue artefacts; performing the transactional persistence.
Type | Name | Description |
---|---|---|
Stored procedure | spEventOutboxEnqueue | The stored procedure used to enqueue zero or more events into the database. |
Used-defined table type | udtEventOutboxList | The type used during enqueue as the events collection being passed. By design this is the database representation (column from/to property) of the CoreEx .NET EventData class. |
Class | EventOutboxEnqueue | Provides the IEventSender implementation (inheriting from EventOutboxEnqueueBase) to perform the enqueue using the spEventOutboxEnqueue stored procedure. |
The following are the key generated Outbox dequeue artefacts.
Type | Name | Description |
---|---|---|
Stored procedure | spEventOutboxDequeue | The stored procedure used to dequeue zero or more events from the database. |
Class | EventOutboxDequeue | Provides the dequeue implementation (inheriting from EventOutboxDequeueBase) using the spEventOutboxDequeue stored procedure. This class is then also responsible for sending the dequeued events (via an IEventSender ) to the final messaging subsystem. On successful send, the dequeued events will be committed (within Outbox) as sent, guaranteeing as-least once messaging semantics. |
The generated .NET DataSvc-layer contains the logic to publish and send the corresponding event(s) managing the event enqueue and underlying business data operations within a database transaction.
The following is a code snippet from the EmployeeDataSvc to demonstrate.
- The
_events.PublishValueEvent
publishes the event to an internal queue ready for send; this allows multiple events/messages to be sent transactionally where required. - The
DataSvcInvoker
is ultimately responsible for orchestrating (see InvokerBase) the database transaction and corresponding send/enqueue given theInvokerArgs
configuration. This is controlled by the{ IncludeTransactionScope = true, EventPublisher = _events })
properties.
public Task<Result<Employee>> TerminateAsync(TerminationDetail value, Guid id) => DataSvcInvoker.Current.InvokeAsync(this, _ =>
{
return Result.GoAsync(_data.TerminateAsync(value, id))
.Then(r => _events.PublishValueEvent(r, new Uri($"myef/hr/employee/{r.Id}", riKind.Relative), $"MyEf.Hr.Employee", "Terminated"))
.Then(r => _cache.SetValue(r));
}, new InvokerArgs { IncludeTransactionScope = true, EventPublisher = _events });
The set up of the event publishing is managed using Dependency Injection (DI) configuration within the API Startup
class.
When the overall solution was created using the Beef template, the eventing-based DI configuration placeholder code would have been similar to the following. The use of the AddNullEventPublisher()
will register the NullEventPublisher
which simply swallows/discards on send; i.e. does nothing.
// Add event publishing services.
services.AddNullEventPublisher();
// Add transactional event outbox services.
services.AddScoped<IEventSender>(sp =>
{
var eoe = new EventOutboxEnqueue(sp.GetRequiredService<IDatabase>(), p.GetRequiredService<ILogger<EventOutboxEnqueue>>());
//eoe.SetPrimaryEventSender(/* the primary sender instance; i.e. service bus */); // This is optional.
return eoe;
});
The event interoperability requires the following services registered.
Service | Description |
---|---|
EventDataFormatter |
The AddEventDataFormatter() registers the EventDataFormatter which is responsible for the formatting of the EventData ; i.e. defaulting and updating it to a consistent state ready for serialization and sending. |
IEventSerializer |
The AddCloudEventSerializer() registers the CloudEventSerializer (see Cloud Events) that is responsible for the serialization of the formatted EventData into the resulting EventSendData.Data required by the configured IEventSender . Additionally, the EventDataSerializer enables a basic EventData.Value JSON serialization via the AddEventDataSerializer registration as an alternate option. |
IEventSender |
The AddScoped<IEventSender, EventOutboxEnqueue>() registers the generated EventOutboxEnqueue as the IEventSender . |
IEventPublisher |
The AddEventPublisher registers the EventPublisher responsible for the event publish and send orchestration. |
To introduce, replace the existing placeholder code with the following within the API Startup
class.
// Add event publishing services.
services.AddEventDataFormatter();
services.AddCloudEventSerializer();
services.AddEventPublisher();
// Add transactional event outbox enqueue services.
services.AddScoped<IEventSender, EventOutboxEnqueue>();
Back in Step 3 unit testing of the API surface was introduced. Within these tests there was an ExpectEvent
and ExpectEventValue
that verified that a corresponding event was being published and sent; even though we had configured the API with a NullEventPublisher
.
Hang on! How was an event verified where configured to discard?
The FixtureSetup
leveraged a UnitTestEx TestSetUp capability, being the ExpectedEventsEnabled
property. Where enabled the IEventPublisher
will be automatically replaced at runtime with the ExpectedEventPublisher
that is used by the ExpectEvent
to verify the expected events were sent.
Therefore, no events will be sent to any external eventing/messaging system during unit testing. This has the advantage of decoupling the test execution from the dependent messaging subsystem, minimizing the need for any additional infrastructure to enable the unit tests.
To achieve a basic test within a developers machine then the API should be executed directly. By leveraging the Swagger endpoint, or using a tool such as Postman, an applicable POST/PUT/DELETE operation should be invoked which will result in the selected data update and corresponding event persisted to the database.
To verify, use a database tool, such as Azure Data Studio to query the Outbox.EventOutbox
and Outbox.EventOutboxData
tables. The Outbox.EventOutbox
manages the enqueue and dequeue state via the EnqueuedDate
and DequeueDate
columns. Where the DequeueDate
column is null
then the event is considered queued and ready for dequeue.
To perform a localized test perform a POST to the /employees/00000001-0000-0000-0000-000000000000/terminate
endpoint; this should be passed the following JSON body.
{
"date": "2023-04-20T17:47:31.898Z",
"reason": "RE"
}
At this stage we now have our events being persisted to the Transactional Outbox ready for sending to the final messaging subsystem.
Next we need to perform the dequeue and Service Bus Publish.