Skip to content

Commit

Permalink
add exit handler
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 18, 2024
1 parent a3da148 commit 510f0e5
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 46 deletions.
1 change: 1 addition & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { NotFound } from "./src/fetch/cors.js";
import { logger } from "./src/logger.js";
import init from "./src/fetch/init.js";
import { show_tables } from "./src/clickhouse/stores.js";
import "./src/exitHandler.js"

if (config.verbose) logger.enable();

Expand Down
6 changes: 4 additions & 2 deletions sql/blocks.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
SELECT
chain,
module_hash,
COUNT() AS count,
COUNTDISTINCT(block_number) AS count_distinct,
MAX(block_number) AS max,
MIN(block_number) AS min,
MAX(block_number) AS max,
max - min + 1 AS delta,
delta - count_distinct AS missing
FROM blocks
GROUP BY chain
WHERE chain = {chain: String}
GROUP BY (chain, module_hash)
23 changes: 19 additions & 4 deletions sql/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { readOnlyClient } from "../src/clickhouse/createClient.js";

export const BlockResponseSchema = z.object({
chain: z.string(),
module_hash: z.string(),
count: z.number(),
distinctCount: z.number(),
min: z.number(),
Expand All @@ -22,11 +23,25 @@ export function getChain(req: Request, required = true) {
return chain;
}

export function getModuleHash(req: Request, required = true) {
const url = new URL(req.url);
const module_hash = url.searchParams.get("module_hash");

if (required && !module_hash) {
throw Error("Missing parameter: module_hash");
}
return module_hash;
}

export async function blocks(req: Request) {
const query = await Bun.file(import.meta.dirname + "/blocks.sql").text()
const chain = getChain(req, false);
const response = await readOnlyClient.query({ query, format: "JSONEachRow" });
let query = await Bun.file(import.meta.dirname + "/blocks.sql").text()
const chain = getChain(req, true);
const module_hash = getModuleHash(req, false);
const response = await readOnlyClient.query({ query_params: {chain}, query, format: "JSONEachRow" });
let data = await response.json() as BlockResponseSchema[];
if ( chain ) data = data.filter((row) => row.chain === chain);

// optional filter by param
// if ( chain ) data = data.filter((row) => row.chain === chain);
if ( module_hash ) data = data.filter((row) => row.module_hash === module_hash);
return data;
}
6 changes: 4 additions & 2 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { PayloadBody } from "../schemas.js";
import { client } from "./createClient.js";
import * as store from "./stores.js";
import logUpdate from 'log-update';
import { logger } from "../logger.js";

type Metadata = { clock: Clock; manifest: Manifest; cursor: string };

Expand Down Expand Up @@ -35,14 +36,15 @@ function logProgress() {
const rate = Math.round(success / delta);
const count = bufferCount();
success++;
logUpdate(`[handleSinkRequest]: ${success} total [${rate} b/s] buffer size: ${count}`);
logUpdate('[handleSinkRequest]', `\t${success} total [${rate} b/s] buffer size: ${count}`);
}

async function flushBuffer() {
export async function flushBuffer(verbose = false) {
// clear buffer every 1 second
if ( lastUpdate != now() ) {
for ( const [table, values] of buffer.entries() ) {
await client.insert({table, values, format: "JSONEachRow"})
if ( verbose ) logger.info('[handleSinkRequest]', `\tinserted ${values.length} rows into ${table}`);
buffer.delete(table);
}
lastUpdate = now();
Expand Down
7 changes: 5 additions & 2 deletions src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export let paused = false;

export function pause(value: boolean) {
paused = value;
logger.info('[pause]', `Paused=${paused}`);
logger.info('[store::pause]', `\tPaused=${paused}`);
}

export async function query_chains() {
Expand All @@ -18,6 +18,7 @@ export async function query_chains() {
.then((response) => response.json<Array<{ chain: string }>>())
.then((chains) => chains.map(({ chain }) => chain))
.catch(() => []);
logger.info('[store:query_chains]', `Total chains: ${chains.length} (${chains.join(", ")})`);

return chains;
}
Expand All @@ -29,6 +30,7 @@ export async function query_module_hashes() {
.then((response) => response.json<Array<{ module_hash: string }>>())
.then((moduleHashes) => moduleHashes.map(({ module_hash }) => module_hash))
.catch(() => []);
logger.info('[store:query_module_hashes]', `Total module_hashes: ${module_hashes.length}`);

return module_hashes;
}
Expand All @@ -47,6 +49,7 @@ export async function show_tables() {
});
const data = await response.json<{name: string}[]>();
tables = new Set(data.map(({ name }) => name));
logger.info('[show_tables]', `Loaded ${tables.size} tables (${[...tables].join(", ")})`);
logger.info('[store::show_tables]', `Loaded ${tables.size} tables (${[...tables].join(", ")})`);

return tables;
}
23 changes: 23 additions & 0 deletions src/exitHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { flushBuffer } from "./clickhouse/handleSinkRequest.js";
import { pause } from "./clickhouse/stores.js";
import { logger } from "./logger.js";

export function exitHandler() {
logger.info('[app]\t', `Server shutting down...`);
pause(true);
flushBuffer();
process.exit();
}

// do something when app is closing
process.on('beforeExit', exitHandler);

// catches ctrl+c event
process.on('SIGINT', exitHandler);

// catches "kill pid" (for example: nodemon restart)
process.on('SIGUSR1', exitHandler);
process.on('SIGUSR2', exitHandler);

// catches uncaught exceptions
process.on('uncaughtException', exitHandler);
42 changes: 6 additions & 36 deletions src/fetch/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async function paramModuleHash(required = true): Promise<ParameterObject> {
return {
name: "module_hash",
in: "query",
required: true,
required,
schema: { enum: await store.query_module_hashes() },
};
}
Expand Down Expand Up @@ -164,23 +164,6 @@ export async function openapi() {
responses: PUT_RESPONSES,
},
})
.addPath("/hash", {
post: {
tags: [TAGS.USAGE],
summary: "Generate a hash for a specified password",
requestBody: {
required: true,
description: "The password to hash",
content: { "text/plain": { schema: { type: "string" } } },
},
responses: {
200: {
description: "Success",
content: { "text/plain": { schema: { type: "string" } } },
},
},
},
})
.addPath("/pause", {
put: {
tags: [TAGS.MAINTENANCE],
Expand Down Expand Up @@ -221,7 +204,7 @@ export async function openapi() {
})
.addPath("/cursor/latest", {
get: {
tags: [TAGS.QUERIES],
tags: [TAGS.USAGE],
summary: "Finds the latest cursor for a given chain and table",
parameters: [
await paramChain(true),
Expand All @@ -248,13 +231,14 @@ export async function openapi() {
.addPath("/blocks", {
get: {
tags: [TAGS.HEALTH],
summary: "Gives a summary of known blocks, including min, max and unique block numbers",
summary: "Gives a summary of known blocks for particular module hashes",
parameters: [
await paramChain(false)
await paramChain(true),
await paramModuleHash(false),
],
responses: {
200: {
description: "Block number summary",
description: "Module hash block summary",
content: {
"application/json": {
schema: zodToJsonSchema(BlockResponseSchema),
Expand Down Expand Up @@ -282,20 +266,6 @@ export async function openapi() {
},
},
})
.addPath("/query", {
post: {
tags: [TAGS.QUERIES],
summary: "Execute queries against the database in read-only mode",
requestBody: {
content: {
"text/plain": {
schema: { type: "string", examples: ["SELECT COUNT() FROM blocks"] },
},
},
},
responses: { 200: { description: "query result", content: { "application/json": {} } } },
},
})
.addPath("/health", {
get: {
tags: [TAGS.HEALTH],
Expand Down

0 comments on commit 510f0e5

Please sign in to comment.