-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathinsert_file_stream_ndjson.ts
49 lines (43 loc) · 1.23 KB
/
insert_file_stream_ndjson.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import type { Row } from '@clickhouse/client'
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'
import split from 'split2'
void (async () => {
const client = createClient()
const tableName = 'insert_file_stream_ndjson'
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})
await client.command({
query: `
CREATE TABLE ${tableName} (id UInt64)
ENGINE MergeTree()
ORDER BY (id)
`,
})
// contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// see also: NDJSON format
const filename = Path.resolve(cwd(), './node/resources/data.ndjson')
const fileStream = Fs.createReadStream(filename).pipe(
split((row: string) => JSON.parse(row)),
)
await client.insert({
table: tableName,
values: fileStream,
format: 'JSONCompactEachRow',
})
const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONEachRow',
})
for await (const rows of rs.stream()) {
// or just `rows.text()` / `rows.json()`
// to consume the entire response at once
rows.forEach((row: Row) => {
console.log(row.json())
})
}
await client.close()
})()