diff --git a/sql/blocks.sql b/sql/blocks.sql index 6c0e989..04097f3 100644 --- a/sql/blocks.sql +++ b/sql/blocks.sql @@ -6,7 +6,8 @@ SELECT MIN(block_number) AS min, MAX(block_number) AS max, max - min + 1 AS delta, - delta - count_distinct AS missing + delta - count_distinct AS missing, + count - count_distinct AS optimize FROM blocks WHERE chain = {chain: String} GROUP BY (chain, module_hash) \ No newline at end of file diff --git a/sql/blocks.ts b/sql/blocks.ts index 64a1258..553b0b4 100644 --- a/sql/blocks.ts +++ b/sql/blocks.ts @@ -2,15 +2,17 @@ import { z } from "zod"; import { readOnlyClient } from "../src/clickhouse/createClient.js"; export const BlockResponseSchema = z.object({ - chain: z.string(), - module_hash: z.string(), + chain: z.string().default("wax"), + module_hash: z.string().default("0670acd8592c0e2aec694a1dafd065218b26360f"), count: z.number(), distinctCount: z.number(), min: z.number(), max: z.number(), delta: z.number(), missing: z.number(), + optimize: z.number(), }); + export type BlockResponseSchema = z.infer; export function getChain(req: Request, required = true) { diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 0233f9e..e7bb79c 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -18,7 +18,9 @@ function now() { return Math.floor(new Date().getTime() / 1000); } -let success = 0; +let entities = 0; +let blocks = 0; +let inserts = 0; let start = now(); let lastUpdate = now(); @@ -33,10 +35,12 @@ function bufferCount() { // TO-DO - use Prometheus metrics as input to this function function logProgress() { const delta = now() - start - const rate = Math.round(success / delta); + const blockRate = Math.round(blocks / delta); + const entitiesRate = Math.round(entities / delta); + const insertsRate = Math.round(inserts / delta); const count = bufferCount(); - success++; - logUpdate('[clickhouse::handleSinkRequest]', `\t${success} total [${rate} b/s] buffer size: ${count}`); + blocks++; + logUpdate(`[clickhouse::handleSinkRequest] blocks=${blocks} [${blockRate}/s] entities=${entities} [${entitiesRate}/s] inserts=${inserts} [${insertsRate}/s] buffer=${count}`); } export async function flushBuffer(verbose = false) { @@ -46,6 +50,7 @@ export async function flushBuffer(verbose = false) { await client.insert({table, values, format: "JSONEachRow"}) if ( verbose ) logger.info('[handleSinkRequest]', `\tinserted ${values.length} rows into ${table}`); buffer.delete(table); + inserts++; } lastUpdate = now(); } @@ -147,6 +152,7 @@ function insertEntityChange( values["timestamp"] = Number(new Date(metadata.clock.timestamp)); // Block timestamp values["operation"] = operation; insertToBuffer(table, values); + entities++; // log prometheus.entity_changes.inc({