Skip to content

Commit

Permalink
Merge branch 'main' into feature/prometheus
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/clickhouse/handleSinkRequest.ts
  • Loading branch information
JulienR1 committed Oct 23, 2023
2 parents d704288 + 638f3dc commit d9cb03d
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 59 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ dist/
substreams-sink-clickhouse
.env
replay.log
*.sql
package-lock.json
.DS_Store
5 changes: 2 additions & 3 deletions src/clickhouse/createDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { client } from "../config.js";
import { logger } from "../logger.js";

export async function createDatabase(database: string) {
logger.info(`Creating database '${database}'`);
if (!database) throw new Error("The database name must be specified");
if (!database) throw new Error("[database] is required");
await client.exec({query: `CREATE DATABASE IF NOT EXISTS "${database}"`});
logger.info(`Database [${database}] created`);
logger.info(`CREATE DATABASE [${database}]`);
}
52 changes: 36 additions & 16 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,32 @@ const { setTimeout } = require("timers/promises");

const knownModuleHashes = new Set<string>();
const knownBlockId = new Set<string>();
const knownBlockIdFinal = new Set<string>();
const existingTables = new Map<string, boolean>();
const queue = new PQueue({ concurrency: config.queueConcurrency });

export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
handleManifest(queue, metadata.manifest);
handleClock(queue, metadata.manifest, metadata.clock);
const { manifest, clock } = metadata;
// Indexes
handleModuleHashes(queue, manifest);
handleBlocks(queue, manifest, clock);
handleFinalBlocks(queue, manifest, clock);

// EntityChanges
for (const change of data.entityChanges) {
handleEntityChange(queue, change, metadata);
}

// Prevent queue from growing too large
queue_size?.set(queue.size);
if (queue.size > config.queueLimit) await setTimeout(1000);

// TO-DO: Logging can be improved
logger.info(
`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`
);
logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`);
return new Response("OK");
}

// Manifest index
function handleManifest(queue: PQueue, manifest: Manifest) {
// Module Hashes index
function handleModuleHashes(queue: PQueue, manifest: Manifest) {
const { moduleHash, type, moduleName, chain } = manifest;
if (!knownModuleHashes.has(moduleHash)) {
queue.add(() =>
Expand All @@ -47,38 +51,54 @@ function handleManifest(queue: PQueue, manifest: Manifest) {
type,
module_name: moduleName,
},
table: "manifest",
table: "module_hashes",
format: "JSONEachRow",
})
);
knownModuleHashes.add(moduleHash);
}
}


// Final Block Index
function handleFinalBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
const block_id = clock.id;
const finalBlockOnly = manifest.finalBlockOnly === "true";
if (!finalBlockOnly) return; // Only insert final blocks

if (!knownBlockIdFinal.has(block_id)) {
queue.add(() =>
client.insert({
values: { block_id },
table: "final_blocks",
format: "JSONEachRow",
})
);
knownBlockIdFinal.add(block_id);
}
}

// Block Index
function handleClock(queue: PQueue, manifest: Manifest, clock: Clock) {
function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
const block_id = clock.id;
const block_number = clock.number;
const timestamp = Number(new Date(clock.timestamp));
const finalBlockOnly = manifest.finalBlockOnly === "true";
const chain = manifest.chain;
const block_key = `${block_id}-${finalBlockOnly}`;

if (!knownBlockId.has(block_key)) {
if (!knownBlockId.has(block_id)) {
queue.add(() =>
client.insert({
values: {
block_id,
block_number,
chain,
timestamp,
final_block: finalBlockOnly,
},
table: "block",
table: "blocks",
format: "JSONEachRow",
})
);
knownBlockId.add(block_key);
knownBlockId.add(block_id);
}
}

Expand Down
44 changes: 5 additions & 39 deletions src/clickhouse/table-initialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,13 @@ import { client } from "../config.js";
import { logger } from "../logger.js";
import { TableInitSchema } from "../schemas.js";
import { splitSchemaByTableCreation } from "./table-utils.js";

const queries = [
`
CREATE TABLE IF NOT EXISTS manifest (
module_hash FixedString(40),
module_name String(),
chain LowCardinality(String),
type String(),
)
ENGINE = ReplacingMergeTree
ORDER BY (module_hash);
`,
`
CREATE TABLE IF NOT EXISTS block (
block_id FixedString(64),
block_number UInt32(),
chain LowCardinality(String),
timestamp DateTime64(3, 'UTC'),
final_block Bool,
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (block_id)
ORDER BY (block_id, block_number, timestamp);
`,
`CREATE TABLE IF NOT EXISTS unparsed_json (
raw_data JSON,
source LowCardinality(String),
id String,
block_id FixedString(64),
module_hash FixedString(40),
chain LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (source, chain, module_hash, block_id)`,
];
import tables from "./tables/index.js";

export function initializeManifest(): Promise<unknown> {
logger.info("Initializing 'manifest' table.");
logger.info("Initializing 'clock' table.");
logger.info("Initializing 'unparsed_json' table.");
return Promise.all(queries.map((query) => client.command({ query })));
return Promise.all(tables.map(([table, query]) => {
logger.info(`CREATE TABLE [${table}]`);
return client.command({ query })
}));
}

const metadataQueries = (tableName: string) => [
Expand Down
9 changes: 9 additions & 0 deletions src/clickhouse/tables/blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS blocks (
block_id FixedString(64),
block_number UInt32(),
chain LowCardinality(String),
timestamp DateTime64(3, 'UTC'),
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (block_id)
ORDER BY (block_id, block_number, chain, timestamp);
5 changes: 5 additions & 0 deletions src/clickhouse/tables/final_blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS final_blocks (
block_id FixedString(64),
)
ENGINE = ReplacingMergeTree
ORDER BY (block_id);
16 changes: 16 additions & 0 deletions src/clickhouse/tables/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import blocks_sql from "./blocks.sql";
import final_blocks_sql from "./final_blocks.sql";
import module_hashes_sql from "./module_hashes.sql";
import unparsed_json_sql from "./unparsed_json.sql";

export const blocks = await Bun.file(blocks_sql).text()
export const final_blocks = await Bun.file(final_blocks_sql).text()
export const module_hashes = await Bun.file(module_hashes_sql).text()
export const unparsed_json = await Bun.file(unparsed_json_sql).text()

export default [
["blocks", blocks],
["final_blocks", final_blocks],
["module_hashes", module_hashes],
["unparsed_json", unparsed_json],
]
8 changes: 8 additions & 0 deletions src/clickhouse/tables/module_hashes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS module_hashes (
module_hash FixedString(40),
module_name String(),
chain LowCardinality(String),
type String(),
)
ENGINE = ReplacingMergeTree
ORDER BY (module_hash);
10 changes: 10 additions & 0 deletions src/clickhouse/tables/unparsed_json.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS unparsed_json (
raw_data JSON,
source LowCardinality(String),
id String,
block_id FixedString(64),
module_hash FixedString(40),
chain LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (source, chain, module_hash, block_id)
5 changes: 5 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ declare module "*.html" {
const content: string;
export default content;
}

declare module "*.sql" {
const content: string;
export default content;
}

0 comments on commit d9cb03d

Please sign in to comment.