Skip to content

Commit

Permalink
write as promise
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 28, 2024
1 parent 5dd17bb commit 99af0a5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.3.6",
"version": "0.3.7",
"description": "Substreams Clickhouse Sink",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down
18 changes: 13 additions & 5 deletions src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ export type Values = Record<string, unknown>;
export type Buffer = Map<string, Values[]>;

const path = "buffer.txt";
const encoding = "utf-16le";
const encoding = "utf-8";

// create a write stream in "append" mode
let writer = fs.createWriteStream(path, {flags: "a", encoding});
export let inserts = 0;

export function bulkInsert(rows: {table: string, values: Values}[]) {
writer.write(rows.map(row => JSON.stringify(row)).join("\n") + "\n");
return Promise.all(rows.map(({table, values}) => insert(table, values)));
}

export function insert(table: string, values: Values) {
export function insert(table: string, values: Values): Promise<void> {
store.check_table(table);
writer.write(JSON.stringify({table, values}) + "\n");
return new Promise((resolve, reject) => {
writer.write(JSON.stringify({table, values}) + "\n", (err) => {
if (err) return reject(err);
return resolve();
});
});
}

export async function read(): Promise<Buffer> {
Expand Down Expand Up @@ -50,7 +55,10 @@ export async function read(): Promise<Buffer> {
}

export async function flush(verbose = false): Promise<void> {
if ( !fs.existsSync(path) ) return;
if ( !fs.existsSync(path) ) {
writer = fs.createWriteStream(path, {flags: "w", encoding});
return;
}
await close();
const buffer = await read();
for ( const [table, values] of buffer.entries() ) {
Expand Down
16 changes: 8 additions & 8 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
// Different handler if `graph_out` or `db_out` is emitting data.
// Handles no incoming data as well.
if ("entityChanges" in data && data.entityChanges.length > 0) {
handleEntityChanges(data.entityChanges, metadata);
await handleEntityChanges(data.entityChanges, metadata);
} else if ("tableChanges" in data && data.tableChanges.length > 0) {
handleDatabaseChanges(data.tableChanges, metadata);
await handleDatabaseChanges(data.tableChanges, metadata);
}

// insert metadata
insertModuleHashes(metadata);
insertBlocks(metadata);
await insertModuleHashes(metadata);
await insertBlocks(metadata);

// clear buffer every 1 second
if ( lastUpdate != now() ) {
Expand Down Expand Up @@ -67,7 +67,7 @@ function insertModuleHashes(metadata: Metadata) {
latest_block_id: metadata.clock.id,
latest_timestamp: Number(new Date(metadata.clock.timestamp)),
};
buffer.insert("module_hashes", values);
return buffer.insert("module_hashes", values);
}

function insertBlocks(metadata: Metadata) {
Expand All @@ -78,7 +78,7 @@ function insertBlocks(metadata: Metadata) {
timestamp: Number(new Date(metadata.clock.timestamp)),
block_id: metadata.clock.id,
};
buffer.insert("blocks", values);
return buffer.insert("blocks", values);
}

function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) {
Expand All @@ -89,7 +89,7 @@ function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata)
const row = insertEntityChange(change.entity, values, change.operation, { ...metadata, id });
rows.push(row);
}
buffer.bulkInsert(rows);
return buffer.bulkInsert(rows);
}

function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) {
Expand All @@ -100,7 +100,7 @@ function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata)
const row = insertEntityChange(change.table, values, change.operation, { ...metadata, id });
rows.push(row);
}
buffer.bulkInsert(rows);
return buffer.bulkInsert(rows);
}

function insertEntityChange(
Expand Down

0 comments on commit 99af0a5

Please sign in to comment.