forked from Sairyss/domain-driven-hexagon
-
Notifications
You must be signed in to change notification settings - Fork 1
/
domain-events.ts
96 lines (84 loc) · 2.77 KB
/
domain-events.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/* eslint-disable no-param-reassign */
import { AggregateRoot } from '../base-classes/aggregate-root.base';
import { Logger } from '../ports/logger.port';
import { DomainEvent, DomainEventHandler } from '.';
import { final } from '../../../decorators/final.decorator';
import { ID } from '../value-objects/id.value-object';
type EventName = string;
export type DomainEventClass = new (...args: never[]) => DomainEvent;
@final
export class DomainEvents {
private static subscribers: Map<EventName, DomainEventHandler[]> = new Map();
private static aggregates: AggregateRoot<unknown>[] = [];
public static subscribe<T extends DomainEventHandler>(
event: DomainEventClass,
eventHandler: T,
): void {
const eventName: EventName = event.name;
if (!this.subscribers.has(eventName)) {
this.subscribers.set(eventName, []);
}
this.subscribers.get(eventName)?.push(eventHandler);
}
public static prepareForPublish(aggregate: AggregateRoot<unknown>): void {
const aggregateFound = !!this.findAggregateByID(aggregate.id);
if (!aggregateFound) {
this.aggregates.push(aggregate);
}
}
public static async publishEvents(
id: ID,
logger: Logger,
correlationId?: string,
): Promise<void> {
const aggregate = this.findAggregateByID(id);
if (aggregate) {
logger.debug(
`[${aggregate.domainEvents.map(
event => event.constructor.name,
)}] published ${aggregate.id.value}`,
);
await Promise.all(
aggregate.domainEvents.map((event: DomainEvent) => {
if (correlationId && !event.correlationId) {
event.correlationId = correlationId;
}
return this.publish(event, logger);
}),
);
aggregate.clearEvents();
this.removeAggregateFromPublishList(aggregate);
}
}
private static findAggregateByID(id: ID): AggregateRoot<unknown> | undefined {
for (const aggregate of this.aggregates) {
if (aggregate.id.equals(id)) {
return aggregate;
}
}
}
private static removeAggregateFromPublishList(
aggregate: AggregateRoot<unknown>,
): void {
const index = this.aggregates.findIndex(a => a.equals(aggregate));
this.aggregates.splice(index, 1);
}
private static async publish(
event: DomainEvent,
logger: Logger,
): Promise<void> {
const eventName: string = event.constructor.name;
if (this.subscribers.has(eventName)) {
const handlers: DomainEventHandler[] =
this.subscribers.get(eventName) || [];
await Promise.all(
handlers.map(handler => {
logger.debug(
`[${handler.constructor.name}] handling ${event.constructor.name} ${event.aggregateId}`,
);
return handler.handle(event);
}),
);
}
}
}