diff --git a/.gitignore b/.gitignore index 4dec3de..82d41a2 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,5 @@ dist/ substreams-sink-clickhouse .env replay.log -*.sql package-lock.json .DS_Store \ No newline at end of file diff --git a/src/clickhouse/createDatabase.ts b/src/clickhouse/createDatabase.ts index faa654a..f954737 100644 --- a/src/clickhouse/createDatabase.ts +++ b/src/clickhouse/createDatabase.ts @@ -2,8 +2,7 @@ import { client } from "../config.js"; import { logger } from "../logger.js"; export async function createDatabase(database: string) { - logger.info(`Creating database '${database}'`); - if (!database) throw new Error("The database name must be specified"); + if (!database) throw new Error("[database] is required"); await client.exec({query: `CREATE DATABASE IF NOT EXISTS "${database}"`}); - logger.info(`Database [${database}] created`); + logger.info(`CREATE DATABASE [${database}]`); } \ No newline at end of file diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 97021d9..ed8b8e2 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -15,28 +15,32 @@ const { setTimeout } = require("timers/promises"); const knownModuleHashes = new Set(); const knownBlockId = new Set(); +const knownBlockIdFinal = new Set(); const existingTables = new Map(); const queue = new PQueue({ concurrency: config.queueConcurrency }); export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { - handleManifest(queue, metadata.manifest); - handleClock(queue, metadata.manifest, metadata.clock); + const { manifest, clock } = metadata; + // Indexes + handleModuleHashes(queue, manifest); + handleBlocks(queue, manifest, clock); + handleFinalBlocks(queue, manifest, clock); + + // EntityChanges for (const change of data.entityChanges) { handleEntityChange(queue, change, metadata); } + // Prevent queue from growing too large queue_size?.set(queue.size); if (queue.size > config.queueLimit) await setTimeout(1000); - // TO-DO: Logging can be improved - logger.info( - `handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}` - ); + logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`); return new Response("OK"); } -// Manifest index -function handleManifest(queue: PQueue, manifest: Manifest) { +// Module Hashes index +function handleModuleHashes(queue: PQueue, manifest: Manifest) { const { moduleHash, type, moduleName, chain } = manifest; if (!knownModuleHashes.has(moduleHash)) { queue.add(() => @@ -47,7 +51,7 @@ function handleManifest(queue: PQueue, manifest: Manifest) { type, module_name: moduleName, }, - table: "manifest", + table: "module_hashes", format: "JSONEachRow", }) ); @@ -55,16 +59,33 @@ function handleManifest(queue: PQueue, manifest: Manifest) { } } + +// Final Block Index +function handleFinalBlocks(queue: PQueue, manifest: Manifest, clock: Clock) { + const block_id = clock.id; + const finalBlockOnly = manifest.finalBlockOnly === "true"; + if (!finalBlockOnly) return; // Only insert final blocks + + if (!knownBlockIdFinal.has(block_id)) { + queue.add(() => + client.insert({ + values: { block_id }, + table: "final_blocks", + format: "JSONEachRow", + }) + ); + knownBlockIdFinal.add(block_id); + } +} + // Block Index -function handleClock(queue: PQueue, manifest: Manifest, clock: Clock) { +function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) { const block_id = clock.id; const block_number = clock.number; const timestamp = Number(new Date(clock.timestamp)); - const finalBlockOnly = manifest.finalBlockOnly === "true"; const chain = manifest.chain; - const block_key = `${block_id}-${finalBlockOnly}`; - if (!knownBlockId.has(block_key)) { + if (!knownBlockId.has(block_id)) { queue.add(() => client.insert({ values: { @@ -72,13 +93,12 @@ function handleClock(queue: PQueue, manifest: Manifest, clock: Clock) { block_number, chain, timestamp, - final_block: finalBlockOnly, }, - table: "block", + table: "blocks", format: "JSONEachRow", }) ); - knownBlockId.add(block_key); + knownBlockId.add(block_id); } } diff --git a/src/clickhouse/table-initialization.ts b/src/clickhouse/table-initialization.ts index aa925ba..c15b5fe 100644 --- a/src/clickhouse/table-initialization.ts +++ b/src/clickhouse/table-initialization.ts @@ -2,47 +2,13 @@ import { client } from "../config.js"; import { logger } from "../logger.js"; import { TableInitSchema } from "../schemas.js"; import { splitSchemaByTableCreation } from "./table-utils.js"; - -const queries = [ - ` -CREATE TABLE IF NOT EXISTS manifest ( - module_hash FixedString(40), - module_name String(), - chain LowCardinality(String), - type String(), -) -ENGINE = ReplacingMergeTree -ORDER BY (module_hash); -`, - ` -CREATE TABLE IF NOT EXISTS block ( - block_id FixedString(64), - block_number UInt32(), - chain LowCardinality(String), - timestamp DateTime64(3, 'UTC'), - final_block Bool, -) -ENGINE = ReplacingMergeTree -PRIMARY KEY (block_id) -ORDER BY (block_id, block_number, timestamp); -`, - `CREATE TABLE IF NOT EXISTS unparsed_json ( - raw_data JSON, - source LowCardinality(String), - id String, - block_id FixedString(64), - module_hash FixedString(40), - chain LowCardinality(String) - ) - ENGINE = MergeTree - ORDER BY (source, chain, module_hash, block_id)`, -]; +import tables from "./tables/index.js"; export function initializeManifest(): Promise { - logger.info("Initializing 'manifest' table."); - logger.info("Initializing 'clock' table."); - logger.info("Initializing 'unparsed_json' table."); - return Promise.all(queries.map((query) => client.command({ query }))); + return Promise.all(tables.map(([table, query]) => { + logger.info(`CREATE TABLE [${table}]`); + return client.command({ query }) + })); } const metadataQueries = (tableName: string) => [ diff --git a/src/clickhouse/tables/blocks.sql b/src/clickhouse/tables/blocks.sql new file mode 100644 index 0000000..fe8fa8b --- /dev/null +++ b/src/clickhouse/tables/blocks.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS blocks ( + block_id FixedString(64), + block_number UInt32(), + chain LowCardinality(String), + timestamp DateTime64(3, 'UTC'), +) +ENGINE = ReplacingMergeTree +PRIMARY KEY (block_id) +ORDER BY (block_id, block_number, chain, timestamp); \ No newline at end of file diff --git a/src/clickhouse/tables/final_blocks.sql b/src/clickhouse/tables/final_blocks.sql new file mode 100644 index 0000000..534a99e --- /dev/null +++ b/src/clickhouse/tables/final_blocks.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS final_blocks ( + block_id FixedString(64), +) +ENGINE = ReplacingMergeTree +ORDER BY (block_id); \ No newline at end of file diff --git a/src/clickhouse/tables/index.ts b/src/clickhouse/tables/index.ts new file mode 100644 index 0000000..a70f2eb --- /dev/null +++ b/src/clickhouse/tables/index.ts @@ -0,0 +1,16 @@ +import blocks_sql from "./blocks.sql"; +import final_blocks_sql from "./final_blocks.sql"; +import module_hashes_sql from "./module_hashes.sql"; +import unparsed_json_sql from "./unparsed_json.sql"; + +export const blocks = await Bun.file(blocks_sql).text() +export const final_blocks = await Bun.file(final_blocks_sql).text() +export const module_hashes = await Bun.file(module_hashes_sql).text() +export const unparsed_json = await Bun.file(unparsed_json_sql).text() + +export default [ + ["blocks", blocks], + ["final_blocks", final_blocks], + ["module_hashes", module_hashes], + ["unparsed_json", unparsed_json], +] \ No newline at end of file diff --git a/src/clickhouse/tables/module_hashes.sql b/src/clickhouse/tables/module_hashes.sql new file mode 100644 index 0000000..00d43cb --- /dev/null +++ b/src/clickhouse/tables/module_hashes.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS module_hashes ( + module_hash FixedString(40), + module_name String(), + chain LowCardinality(String), + type String(), +) +ENGINE = ReplacingMergeTree +ORDER BY (module_hash); \ No newline at end of file diff --git a/src/clickhouse/tables/unparsed_json.sql b/src/clickhouse/tables/unparsed_json.sql new file mode 100644 index 0000000..ad71ca8 --- /dev/null +++ b/src/clickhouse/tables/unparsed_json.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS unparsed_json ( + raw_data JSON, + source LowCardinality(String), + id String, + block_id FixedString(64), + module_hash FixedString(40), + chain LowCardinality(String) +) +ENGINE = MergeTree +ORDER BY (source, chain, module_hash, block_id) \ No newline at end of file diff --git a/src/types.d.ts b/src/types.d.ts index 621ae51..1e62f10 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -7,3 +7,8 @@ declare module "*.html" { const content: string; export default content; } + +declare module "*.sql" { + const content: string; + export default content; +}