Skip to content

Commit

Permalink
add optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 18, 2024
1 parent 82ed253 commit 8bf82c5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
3 changes: 2 additions & 1 deletion sql/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ SELECT
MIN(block_number) AS min,
MAX(block_number) AS max,
max - min + 1 AS delta,
delta - count_distinct AS missing
delta - count_distinct AS missing,
count - count_distinct AS optimize
FROM blocks
WHERE chain = {chain: String}
GROUP BY (chain, module_hash)
6 changes: 4 additions & 2 deletions sql/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { z } from "zod";
import { readOnlyClient } from "../src/clickhouse/createClient.js";

export const BlockResponseSchema = z.object({
chain: z.string(),
module_hash: z.string(),
chain: z.string().default("wax"),
module_hash: z.string().default("0670acd8592c0e2aec694a1dafd065218b26360f"),
count: z.number(),
distinctCount: z.number(),
min: z.number(),
max: z.number(),
delta: z.number(),
missing: z.number(),
optimize: z.number(),
});

export type BlockResponseSchema = z.infer<typeof BlockResponseSchema>;

export function getChain(req: Request, required = true) {
Expand Down
14 changes: 10 additions & 4 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ function now() {
return Math.floor(new Date().getTime() / 1000);
}

let success = 0;
let entities = 0;
let blocks = 0;
let inserts = 0;
let start = now();
let lastUpdate = now();

Expand All @@ -33,10 +35,12 @@ function bufferCount() {
// TO-DO - use Prometheus metrics as input to this function
function logProgress() {
const delta = now() - start
const rate = Math.round(success / delta);
const blockRate = Math.round(blocks / delta);
const entitiesRate = Math.round(entities / delta);
const insertsRate = Math.round(inserts / delta);
const count = bufferCount();
success++;
logUpdate('[clickhouse::handleSinkRequest]', `\t${success} total [${rate} b/s] buffer size: ${count}`);
blocks++;
logUpdate(`[clickhouse::handleSinkRequest] blocks=${blocks} [${blockRate}/s] entities=${entities} [${entitiesRate}/s] inserts=${inserts} [${insertsRate}/s] buffer=${count}`);
}

export async function flushBuffer(verbose = false) {
Expand All @@ -46,6 +50,7 @@ export async function flushBuffer(verbose = false) {
await client.insert({table, values, format: "JSONEachRow"})
if ( verbose ) logger.info('[handleSinkRequest]', `\tinserted ${values.length} rows into ${table}`);
buffer.delete(table);
inserts++;
}
lastUpdate = now();
}
Expand Down Expand Up @@ -147,6 +152,7 @@ function insertEntityChange(
values["timestamp"] = Number(new Date(metadata.clock.timestamp)); // Block timestamp
values["operation"] = operation;
insertToBuffer(table, values);
entities++;

// log
prometheus.entity_changes.inc({
Expand Down

0 comments on commit 8bf82c5

Please sign in to comment.