forked from ccorcos/tuple-database
-
Notifications
You must be signed in to change notification settings - Fork 3
/
AsyncReactivityTracker.ts
81 lines (70 loc) · 2.28 KB
/
AsyncReactivityTracker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { maybePromiseAll } from "../../helpers/maybeWaitForPromises"
import { Bounds } from "../../helpers/sortedTupleArray"
import {
KeyValuePair,
ScanStorageArgs,
Tuple,
WriteOps,
} from "../../storage/types"
import { TxId } from "../types"
import { AsyncCallback } from "./asyncTypes"
import * as SortedTupleValue from "../../helpers/sortedTupleValuePairs"
import * as SortedTuple from "../../helpers/sortedTupleArray"
type Listeners = Map<AsyncCallback, Bounds>
export class AsyncReactivityTracker {
private listeners: Listeners = new Map()
subscribe(args: ScanStorageArgs, callback: AsyncCallback) {
return subscribe(this.listeners, args, callback)
}
computeReactivityEmits(writes: WriteOps) {
return getReactivityEmits(this.listeners, writes)
}
async emit(emits: ReactivityEmits, txId: TxId) {
let promises: any[] = []
for (const [callback, writes] of emits.entries()) {
try {
// Catch sync callbacks.
promises.push(callback(writes, txId))
} catch (error) {
console.error(error)
}
}
// This trick allows us to return a Promise from a sync TupleDatabase#commit
// when there are async callbacks. And this allows us to create an async client
// on top of a sync client.
return maybePromiseAll(promises)
}
}
type ReactivityEmits = Map<AsyncCallback, Required<WriteOps>>
function getReactivityEmits(listenersDb: Listeners, writes: WriteOps) {
const emits: ReactivityEmits = new Map()
for (const [callback, bounds] of listenersDb) {
const matchingWrites: KeyValuePair[] = []
const matchingRemoves: Tuple[] = []
// Found it to be slightly faster to not assume this is sorted and check bounds individually instead of using scan(writes.set, bounds)
for (const kv of writes.set || []) {
if (SortedTuple.isTupleWithinBounds(kv.key, bounds)) {
matchingWrites.push(kv)
}
}
for (const tuple of writes.remove || []) {
if (SortedTuple.isTupleWithinBounds(tuple, bounds)) {
matchingRemoves.push(tuple)
}
}
if (matchingWrites.length > 0 || matchingRemoves.length > 0) {
emits.set(callback, { set: matchingWrites, remove: matchingRemoves })
}
}
return emits
}
function subscribe(
listenersDb: Listeners,
args: ScanStorageArgs,
callback: AsyncCallback
) {
listenersDb.set(callback, args)
return () => {
listenersDb.delete(callback)
}
}