-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpg.js
169 lines (148 loc) · 4.61 KB
/
pg.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import { postgres } from './deps.ts'
import remodel from './remodel.js'
export async function pgInit(url, migrations) {
const pg = await crennect(url)
const dbname = new URL(url).pathname.slice(1)
await remodel(pg, {
migrations,
table: `${dbname}_migrations`,
})
self.addEventListener('unload', async () => await pg?.end())
return pg
}
export async function pgListen(pg, handleEvent) {
await pg.listen('event', handleEvent)
}
export async function pgStoreNotify(pg, event) {
const { id, pubkey, created_at: createdAt, kind, tags } = event
// nip 16 ephemeral
if (kind >= 20000 && kind < 30000) {
return await pg.notify('event', JSON.stringify(event))
}
// nip 26 delegation
const delegator = tags.find(([t]) => t === 'delegation')?.at(1)
await pg.begin((pg) => {
const line = []
// nip 1, 3, and 16 replaceable kinds
if (kind === 0 || kind === 3 || (kind >= 10000 && kind < 20000)) {
line.push(pg`DELETE FROM event WHERE kind = ${kind}
AND pubkey = ${pubkey} AND created_at < ${createdAt}`)
}
// nip 1 event
line.push(pg`
INSERT INTO event (id, pubkey, delegator, created_at, kind, raw)
VALUES (${id}, ${pubkey}, ${delegator}, ${createdAt}, ${kind},
${JSON.stringify(event)})`)
let firstD = true
for (const [tag, ...values] of tags) {
// nip 1 tags
line.push(pg`INSERT INTO tag (event_id, tag, values)
VALUES (${id}, ${tag}, ${values})`)
// nip 9 delete referenced event
if (kind === 5 && tag === 'e' && values.length) {
line.push(pg`DELETE FROM event WHERE id = ${values[0]}
AND pubkey = ${pubkey}`)
}
// nip 40 expiration
if (tag === 'expiration') {
line.push(pg`UPDATE event SET expires_at = ${Number(values[0])}
WHERE id = ${id}`)
}
// nip 33 parameterized replacement
if (kind >= 30000 && kind < 40000 && firstD && tag === 'd') {
firstD = false
line.push(
pg`DELETE FROM event
WHERE kind = ${kind}
AND pubkey = ${pubkey}
AND created_at < ${createdAt}
AND EXISTS (
SELECT 1
FROM tag
WHERE event.id = tag.event_id
AND tag.tag = 'd'
AND COALESCE(tag.values[1], '') = COALESCE(${values[0]}, ''))`,
)
}
}
return line
})
}
export async function pgForEachEvent(pg, filters, cb) {
for (
const {
ids,
authors,
kinds,
since,
until,
limit,
...tags
} of filters
) {
const cursor = pg`
WITH filter_tag AS (
SELECT ltrim(key, '#') AS tag,
jsonb_array_to_text_array(value) AS values
FROM jsonb_each(${tags})
)
SELECT event.raw
FROM event
LEFT JOIN tag ON tag.event_id = event.id
LEFT JOIN filter_tag
ON tag.tag = filter_tag.tag AND tag.values && filter_tag.values
WHERE (${ids}::TEXT[] IS NULL OR event.id ^@ ANY (${ids}::TEXT[]))
AND (${authors}::TEXT[] IS NULL
OR event.pubkey ^@ ANY (${authors}::TEXT[])
OR event.delegator ^@ ANY (${authors}::TEXT[]))
AND (${kinds}::INTEGER[] IS NULL OR event.kind = ANY (${kinds}::INTEGER[]))
AND (${since}::INTEGER IS NULL OR event.created_at >= ${since})
AND (${until}::INTEGER IS NULL OR event.created_at <= ${until})
AND (event.expires_at IS NULL
OR event.expires_at > extract(epoch from now()))
GROUP BY event.id
HAVING count(filter_tag.tag) = (SELECT count(*) FROM filter_tag)
ORDER BY event.created_at DESC
LIMIT ${limit}`.values().cursor(100)
for await (const rows of cursor) {
rows.forEach((row) => cb(row[0]))
}
}
}
function connect(url) {
return postgres(
url,
{
// debug: console.log,
transform: {
undefined: null,
},
},
)
}
async function crennect(url) {
try {
const pgTry = connect(url)
await pgTry`SELECT 1` // conn doesn't happen until first query
return pgTry
} catch (e) {
if (e.code === '3D000') { // database does not exist
try {
const urlObj = new URL(url)
const db = urlObj.pathname.slice(1)
console.log(`db ${db} does not exist, attempting to create ...`)
urlObj.pathname = '/postgres' // common default
const tempPg = postgres(urlObj)
await tempPg.unsafe(`CREATE DATABASE ${db}`)
await tempPg.end()
console.log(
`\x1b[A\x1b[Kdb ${db} does not exist, attempting to create ... created ${db}`,
)
} catch {
throw e
}
return connect(url)
}
throw e
}
}