-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathspEventOutboxEnqueue.sql
116 lines (101 loc) · 3.4 KB
/
spEventOutboxEnqueue.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
CREATE PROCEDURE [Outbox].[spEventOutboxEnqueue]
@SetEventsAsDequeued AS BIT = 0,
@EventList AS [Outbox].[udtEventOutboxList] READONLY
AS
BEGIN
/*
* This is automatically generated; any changes will be lost.
*/
SET NOCOUNT ON;
BEGIN TRY
-- Wrap in a transaction.
BEGIN TRANSACTION
-- Working variables.
DECLARE @eventOutboxId BIGINT,
@enqueuedDate DATETIME,
@dequeuedDate DATETIME
SET @enqueuedDate = SYSUTCDATETIME()
SET @dequeuedDate = @enqueuedDate
-- Enqueued outbox resultant identifier.
DECLARE @enqueuedId TABLE([EventOutboxId] BIGINT)
-- Cursor output variables.
DECLARE @eventId NVARCHAR(127),
@eventDequeued BIT,
@destination NVARCHAR(127),
@subject NVARCHAR(511),
@action NVARCHAR(255),
@type NVARCHAR(1023),
@source NVARCHAR(1023),
@timestamp DATETIMEOFFSET,
@correlationId NVARCHAR(127),
@key NVARCHAR(1023),
@tenantId NVARCHAR(127),
@partitionKey NVARCHAR(127),
@etag NVARCHAR(127),
@attributes VARBINARY(MAX),
@data VARBINARY(MAX)
-- Declare, open, and fetch first event from cursor.
DECLARE c CURSOR FORWARD_ONLY
FOR SELECT [EventId], [EventDequeued], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [Key], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList
OPEN c
FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @key, @tenantId, @partitionKey, @etag, @attributes, @data
-- Iterate the event(s).
WHILE @@FETCH_STATUS = 0
BEGIN
-- Enqueue event into outbox
INSERT INTO [Outbox].[EventOutbox] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate])
OUTPUT inserted.EventOutboxId INTO @enqueuedId
VALUES (@enqueuedDate, @partitionKey, @destination, CASE WHEN @eventDequeued IS NULL OR @eventDequeued = 0 THEN NULL ELSE @dequeuedDate END)
SELECT @eventOutboxId = [EventOutboxId] FROM @enqueuedId
-- Insert corresponding event data.
INSERT INTO [Outbox].[EventOutboxData] (
[EventOutboxDataId],
[EventId],
[Destination],
[Subject],
[Action],
[Type],
[Source],
[Timestamp],
[CorrelationId],
[Key],
[TenantId],
[PartitionKey],
[ETag],
[Attributes],
[Data]
)
VALUES (
@eventOutboxId,
@eventId,
@destination,
@subject,
@action,
@type,
@source,
@timestamp,
@correlationId,
@key,
@tenantId,
@partitionKey,
@etag,
@attributes,
@data
)
-- Fetch the next event from the cursor.
FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @key, @tenantId, @partitionKey, @etag, @attributes, @data
END
-- Close the cursor.
CLOSE c
DEALLOCATE c
-- Commit the transaction.
COMMIT TRANSACTION
RETURN 0
END TRY
BEGIN CATCH
-- Rollback transaction and rethrow error.
IF @@TRANCOUNT > 0
ROLLBACK TRANSACTION;
THROW;
END CATCH
END