diff --git a/README.md b/README.md index c0e3dd0..762d12c 100644 --- a/README.md +++ b/README.md @@ -121,11 +121,13 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b erDiagram USER_DIMENSION }|--|{ blocks : " " USER_DIMENSION }|--|{ module_hashes : " " + USER_DIMENSION }|--|{ cursors : " " + + blocks }|--|{ final_blocks : " " blocks }|--|{ unparsed_json : " " module_hashes }|--|{ unparsed_json : " " - - blocks }|--|{ final_blocks : " " + cursors }|--|{ unparsed_json : " " USER_DIMENSION { user_data unknown @@ -161,6 +163,13 @@ erDiagram final_blocks { block_id FixedString(64) } + + cursors { + cursor String + module_hash FixedString(40) + block_id FixedString(64) + chain LowCardinality(String) + } ``` **Indexes** @@ -168,8 +177,9 @@ erDiagram | Table | Fields | | -------------- | -------------------------------------------- | | USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` | -| module_hashes | `module_hash` | | blocks | `(block_id, block_number, chain, timestamp)` | +| module_hashes | `module_hash` | +| cursors | `(cursor, module_hash, block_id)` | | unparsed_json | `(source, chain, module_hash, block_id)` | | final_blocks | `block_id` | diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index ef558a4..5e9d54a 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -1,11 +1,11 @@ import { EntityChange } from "@substreams/sink-entity-changes/zod"; import PQueue from "p-queue"; -import client from "./createClient.js"; import { config } from "../config.js"; import { getValuesInEntityChange } from "../entity-changes.js"; import { logger } from "../logger.js"; import * as prometheus from "../prometheus.js"; import { Clock, Manifest, PayloadBody } from "../schemas.js"; +import client from "./createClient.js"; const { setTimeout } = require("timers/promises"); // TO-DO: moves these to a separate file `src/clickhouse/stores.ts` @@ -16,11 +16,12 @@ const knownTables = new Map(); const queue = new PQueue({ concurrency: config.queueConcurrency }); export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { - const { manifest, clock } = metadata; + const { manifest, clock, cursor } = metadata; // Indexes handleModuleHashes(queue, manifest); handleBlocks(queue, manifest, clock); handleFinalBlocks(queue, manifest, clock); + handleCursors(queue, manifest, clock, cursor); // EntityChanges for (const change of data.entityChanges) { @@ -97,7 +98,26 @@ function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) { } } -async function handleEntityChange(queue: PQueue, change: EntityChange, metadata: { clock: Clock; manifest: Manifest }) { +function handleCursors(queue: PQueue, manifest: Manifest, clock: Clock, cursor: string) { + queue.add(() => + client.insert({ + values: { + cursor, + block_id: clock.id, + chain: manifest.chain, + module_hash: manifest.moduleHash, + }, + table: "cursors", + format: "JSONEachRow", + }) + ); +} + +async function handleEntityChange( + queue: PQueue, + change: EntityChange, + metadata: { clock: Clock; manifest: Manifest } +) { const tableExists = await existsTable(change.entity); let values = getValuesInEntityChange(change); diff --git a/src/clickhouse/table-initialization.ts b/src/clickhouse/table-initialization.ts index bd05900..4563e76 100644 --- a/src/clickhouse/table-initialization.ts +++ b/src/clickhouse/table-initialization.ts @@ -1,5 +1,5 @@ -import client from "./createClient.js"; import { logger } from "../logger.js"; +import client from "./createClient.js"; import { getTableName } from "./table-utils.js"; import tables from "./tables/index.js"; diff --git a/src/clickhouse/tables/cursors.sql b/src/clickhouse/tables/cursors.sql new file mode 100644 index 0000000..ca602c2 --- /dev/null +++ b/src/clickhouse/tables/cursors.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS cursors ( + cursor String, + module_hash FixedString(40), + block_id FixedString(64), + chain LowCardinality(String), +) +ENGINE = ReplacingMergeTree +PRIMARY KEY (cursor) +ORDER BY (cursor, module_hash, block_id); \ No newline at end of file diff --git a/src/clickhouse/tables/index.ts b/src/clickhouse/tables/index.ts index a70f2eb..36c3ea1 100644 --- a/src/clickhouse/tables/index.ts +++ b/src/clickhouse/tables/index.ts @@ -1,16 +1,19 @@ import blocks_sql from "./blocks.sql"; +import cursors_sql from "./cursors.sql"; import final_blocks_sql from "./final_blocks.sql"; import module_hashes_sql from "./module_hashes.sql"; import unparsed_json_sql from "./unparsed_json.sql"; -export const blocks = await Bun.file(blocks_sql).text() -export const final_blocks = await Bun.file(final_blocks_sql).text() -export const module_hashes = await Bun.file(module_hashes_sql).text() -export const unparsed_json = await Bun.file(unparsed_json_sql).text() +export const blocks = await Bun.file(blocks_sql).text(); +export const final_blocks = await Bun.file(final_blocks_sql).text(); +export const module_hashes = await Bun.file(module_hashes_sql).text(); +export const unparsed_json = await Bun.file(unparsed_json_sql).text(); +export const cursors = await Bun.file(cursors_sql).text(); export default [ - ["blocks", blocks], - ["final_blocks", final_blocks], - ["module_hashes", module_hashes], - ["unparsed_json", unparsed_json], -] \ No newline at end of file + ["blocks", blocks], + ["final_blocks", final_blocks], + ["module_hashes", module_hashes], + ["unparsed_json", unparsed_json], + ["cursors", cursors], +];