Skip to content

Commit

Permalink
refactor existsTable
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Oct 23, 2023
1 parent 638f3dc commit 7ea2407
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import { Clock, Manifest, PayloadBody } from "../schemas.js";
import { client, config } from "../config.js";
const { setTimeout } = require("timers/promises");

// TO-DO: moves these to a separate file `src/clickhouse/stores.ts`
const knownModuleHashes = new Set<string>();
const knownBlockId = new Set<string>();
const knownBlockIdFinal = new Set<string>();
const existingTables = new Map<string, boolean>();
const knownTables = new Map<string, boolean>();
const queue = new PQueue({ concurrency: config.queueConcurrency });

export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
Expand Down Expand Up @@ -51,7 +52,6 @@ function handleModuleHashes(queue: PQueue, manifest: Manifest) {
}
}


// Final Block Index
function handleFinalBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
const block_id = clock.id;
Expand Down Expand Up @@ -99,8 +99,6 @@ async function handleEntityChange(
change: EntityChange,
metadata: { clock: Clock; manifest: Manifest }
) {
// TO-DO: existsTable needs to be refactored to use `client.query()`
// Or else should be removed entirely
const tableExists = await existsTable(change.entity);

let values = getValuesInEntityChange(change);
Expand Down Expand Up @@ -142,21 +140,25 @@ function insertEntityChange(

return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" }));
}

// TO-DO: this function won't work in a serverless function environment or running with multiple replicas
// Cannot depend on memory to know if table exists or not
// in memory TABLE name cache
// if true => true
// if false => false
// if undefined => check EXISTS if true or false
// TO-DO when schema TABLE is updated, update knownTables
async function existsTable(table: string) {
if (!existingTables.has(table)) {
const response = await client.query({
query: "EXISTS " + table,
format: "JSONEachRow",
});
const data = await response.json<Array<{ result: 0 | 1 }>>();

const foundTable = data[0]?.result === 1;
existingTables.set(table, foundTable);

logger.info(`Found table '${table}': ${foundTable}. Saving data as json: ${!foundTable}`);
}
return existingTables.get(table)!;
// Return cached value if known (reduces number of EXISTS queries)
if ( knownTables.has(table) ) return knownTables.get(table);

// Check if table EXISTS
const response = await client.query({
query: "EXISTS " + table,
format: "JSONEachRow",
});

// handle EXISTS response
const data = await response.json<Array<{ result: 0 | 1 }>>();
const exists = data[0]?.result === 1;
knownTables.set(table, exists);
logger.info(`EXISTS [${table}=${exists}]`);
return exists;
}

0 comments on commit 7ea2407

Please sign in to comment.