-
Notifications
You must be signed in to change notification settings - Fork 5
/
asyncProcessor.js
84 lines (75 loc) · 2.82 KB
/
asyncProcessor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
module.exports = class AsyncProcessor {
#dataProcessingHandler
#errorHandler
#processingBudget
#activeWork = new Map()
#unsubscribe = null;
constructor(dataProcessingHandler, processingBudget = 3, errorHandler = (err) => Promise.resolve(console.error(err))) {
this.#dataProcessingHandler = dataProcessingHandler;
this.#processingBudget = processingBudget;
this.#errorHandler = errorHandler;
this.streamHandler = this.streamHandler.bind(this);
this.waitForAllCompletion = this.waitForAllCompletion.bind(this);
this.activeItems = this.activeItems.bind(this);
this._processItem = this._processItem.bind(this);
this._holdForItemCompletion = this._holdForItemCompletion.bind(this);
}
async streamHandler(payloads) {
do {
if (this.#activeWork.size >= this.#processingBudget) {
await this._holdForItemCompletion();
}
if (payloads.length > 0) {
const payload = payloads.pop();
const itemProcessingPromise = this._processItem(payload);
this.#activeWork.set(payload.id, itemProcessingPromise);
}
}
while (payloads.length > 0 || this.#activeWork.size >= this.#processingBudget)
return this.#unsubscribe || (this.#processingBudget - this.#activeWork.size);
}
async waitForAllCompletion(unsubscribe = false) {
if (unsubscribe === true) {
this.#unsubscribe = -1;
}
do {
await this._holdForItemCompletion();
}
while (this.#activeWork.size > 0)
}
activeItems() {
return this.#activeWork.size;
}
async _holdForItemCompletion() {
if (this.#activeWork.size > 0) {
const completedPayloadId = await Promise.race(this.#activeWork.values());
if (this.#activeWork.has(completedPayloadId) === true) {
this.#activeWork.delete(completedPayloadId);
}
else {
await this.#errorHandler(new Error(`Payload(${completedPayloadId}) is missing from active work list.`));
}
}
else {
return Promise.resolve();
}
}
async _processItem(payload) {
try {
let handlerResult = await this.#dataProcessingHandler(payload.channel, payload.id, payload.payload)
if (handlerResult[0] === true && handlerResult[1] === true) {
await payload.markAsRead(true);
}
else if (handlerResult[0] === true && handlerResult[1] === false) {
await payload.markAsRead(false);
}
}
catch (err) {
err.id = payload.id;
await this.#errorHandler(err);
}
finally {
return payload.id;
}
}
}