Skip to content

Commit

Permalink
Merge pull request #119 from pinax-network/fix/read-files
Browse files Browse the repository at this point in the history
fix read file
  • Loading branch information
DenisCarriere authored Feb 29, 2024
2 parents 99af0a5 + 0019e69 commit f6c72e9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 36 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.3.7",
"version": "0.3.8",
"description": "Substreams Clickhouse Sink",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down
48 changes: 13 additions & 35 deletions src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,25 @@ export function insert(table: string, values: Values): Promise<void> {
export async function read(): Promise<Buffer> {
const buffer: Buffer = new Map();
if ( !fs.existsSync(path) ) return buffer;
const input = fs.createReadStream(path, {encoding});
const rl = readline.createInterface({ input });

return new Promise((resolve, reject) => {
rl.on("line", (line) => {
const {table, values} = JSON.parse(line);
if (buffer.has(table)) {
buffer.get(table)?.push(values);
} else {
buffer.set(table, [values]);
}
});
rl.on("close", () => {
input.close();
rl.close();
return resolve(buffer);
});
rl.on("error", (err) => {
return reject(err);
});
});
const file = Bun.file(path)
const text = await file.text();
const lines = text.split("\n").map((line) => line.trim()).filter((line) => line.length > 0);
for ( const line of lines ) {
const {table, values} = JSON.parse(line);
if (buffer.has(table)) {
buffer.get(table)?.push(values);
} else {
buffer.set(table, [values]);
}
}
return buffer;
}

export async function flush(verbose = false): Promise<void> {
if ( !fs.existsSync(path) ) {
writer = fs.createWriteStream(path, {flags: "w", encoding});
return;
}
await close();
const buffer = await read();
for ( const [table, values] of buffer.entries() ) {
await client.insert({table, values, format: "JSONEachRow"})
Expand All @@ -70,23 +60,11 @@ export async function flush(verbose = false): Promise<void> {
writer = fs.createWriteStream(path, {flags: "w", encoding});
}

export function close(): Promise<void> {
return new Promise((resolve, reject) => {
if ( !writer ) return resolve();
if ( writer.destroyed ) return resolve();
if ( writer.closed ) return resolve();

writer.close((err) => {
if (err) return reject(err);
return resolve();
});
});
}

export function count(buffer: Buffer) {
let count = 0
for ( const value of buffer.values() ) {
count += value.length;
};
return count;
}

0 comments on commit f6c72e9

Please sign in to comment.