diff --git a/package.json b/package.json index 306dd1b..b145850 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "substreams-sink-clickhouse", - "version": "0.3.7", + "version": "0.3.8", "description": "Substreams Clickhouse Sink", "type": "module", "homepage": "https://github.com/pinax-network/substreams-sink-clickhouse", diff --git a/src/buffer.ts b/src/buffer.ts index c7a7851..90d8b72 100644 --- a/src/buffer.ts +++ b/src/buffer.ts @@ -31,27 +31,18 @@ export function insert(table: string, values: Values): Promise { export async function read(): Promise { const buffer: Buffer = new Map(); if ( !fs.existsSync(path) ) return buffer; - const input = fs.createReadStream(path, {encoding}); - const rl = readline.createInterface({ input }); - - return new Promise((resolve, reject) => { - rl.on("line", (line) => { - const {table, values} = JSON.parse(line); - if (buffer.has(table)) { - buffer.get(table)?.push(values); - } else { - buffer.set(table, [values]); - } - }); - rl.on("close", () => { - input.close(); - rl.close(); - return resolve(buffer); - }); - rl.on("error", (err) => { - return reject(err); - }); - }); + const file = Bun.file(path) + const text = await file.text(); + const lines = text.split("\n").map((line) => line.trim()).filter((line) => line.length > 0); + for ( const line of lines ) { + const {table, values} = JSON.parse(line); + if (buffer.has(table)) { + buffer.get(table)?.push(values); + } else { + buffer.set(table, [values]); + } + } + return buffer; } export async function flush(verbose = false): Promise { @@ -59,7 +50,6 @@ export async function flush(verbose = false): Promise { writer = fs.createWriteStream(path, {flags: "w", encoding}); return; } - await close(); const buffer = await read(); for ( const [table, values] of buffer.entries() ) { await client.insert({table, values, format: "JSONEachRow"}) @@ -70,19 +60,6 @@ export async function flush(verbose = false): Promise { writer = fs.createWriteStream(path, {flags: "w", encoding}); } -export function close(): Promise { - return new Promise((resolve, reject) => { - if ( !writer ) return resolve(); - if ( writer.destroyed ) return resolve(); - if ( writer.closed ) return resolve(); - - writer.close((err) => { - if (err) return reject(err); - return resolve(); - }); - }); -} - export function count(buffer: Buffer) { let count = 0 for ( const value of buffer.values() ) { @@ -90,3 +67,4 @@ export function count(buffer: Buffer) { }; return count; } +