-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathspEventOutboxDequeue.sql
65 lines (58 loc) · 1.9 KB
/
spEventOutboxDequeue.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
CREATE PROCEDURE [Outbox].[spEventOutboxDequeue]
@MaxDequeueSize INT = 10, -- Maximum number of events to dequeue.
@PartitionKey NVARCHAR(127) NULL = NULL, -- Partition key; null indicates all.
@Destination NVARCHAR(127) NULL = NULL -- Destination (queue or topic); null indicates all.
AS
BEGIN
/*
* This is automatically generated; any changes will be lost.
*/
SET NOCOUNT ON;
BEGIN TRY
-- Wrap in a transaction.
BEGIN TRANSACTION
-- Dequeued outbox resultant identifier.
DECLARE @dequeuedId TABLE([EventOutboxId] BIGINT);
-- Dequeue event -> ROWLOCK+UPDLOCK maintain singular access for ordering and concurrency
WITH cte([EventOutboxId], [PartitionKey], [Destination], [DequeuedDate]) AS
(
SELECT TOP(@MaxDequeueSize) [EventOutboxId], [PartitionKey], [Destination], [DequeuedDate]
FROM [Outbox].[EventOutbox] WITH (ROWLOCK, UPDLOCK)
WHERE [DequeuedDate] IS NULL
AND (@PartitionKey IS NULL OR [PartitionKey] = @PartitionKey)
AND (@Destination IS NULL OR [Destination] = @Destination)
ORDER BY [EventOutboxId]
)
UPDATE Cte
SET [DequeuedDate] = SYSUTCDATETIME()
OUTPUT deleted.EventOutboxId INTO @dequeuedId;
-- Get the dequeued event outbox data.
SELECT
[EventOutboxDataId] as [EventOutboxId],
[EventId],
[Destination],
[Subject],
[Action],
[Type],
[Source],
[Timestamp],
[CorrelationId],
[Key],
[TenantId],
[PartitionKey],
[ETag],
[Attributes],
[Data]
FROM [Outbox].[EventOutboxData]
WHERE [EventOutboxDataId] IN (SELECT [EventOutboxId] FROM @dequeuedId)
-- Commit the transaction.
COMMIT TRANSACTION
RETURN 0
END TRY
BEGIN CATCH
-- Rollback transaction and rethrow error.
IF @@TRANCOUNT > 0
ROLLBACK TRANSACTION;
THROW;
END CATCH
END