From c11564aa5e8332b1489d156f1abb27f03e941570 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Tue, 24 Oct 2023 00:19:23 -0400 Subject: [PATCH] small refactor --- index.ts | 2 +- src/clickhouse/handleSinkRequest.ts | 35 +++++++++-------------------- src/prometheus.ts | 14 ++++++------ 3 files changed, 18 insertions(+), 33 deletions(-) diff --git a/index.ts b/index.ts index 325083f..1680e83 100644 --- a/index.ts +++ b/index.ts @@ -11,7 +11,7 @@ if (config.verbose) logger.enable(); const app = Bun.serve({ hostname: config.hostname, port: config.port, - fetch: function fetch(req: Request) { + fetch(req: Request) { if (req.method == "GET") return GET(req); if (req.method == "POST") return POST(req); if (req.method == "PUT") return PUT(req); diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index c98017c..8eecb01 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -3,13 +3,7 @@ import PQueue from "p-queue"; import { client, config } from "../config.js"; import { getValuesInEntityChange } from "../entity-changes.js"; import { logger } from "../logger.js"; -import { - entity_chages_unsupported, - entity_changes_deleted, - entity_changes_inserted, - entity_changes_updated, - queue_size, -} from "../prometheus.js"; +import * as prometheus from "../prometheus.js"; import { Clock, Manifest, PayloadBody } from "../schemas.js"; const { setTimeout } = require("timers/promises"); @@ -21,7 +15,7 @@ const knownTables = new Map(); const queue = new PQueue({ concurrency: config.queueConcurrency }); export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { - const { manifest, clock } = metadata; + const { manifest, clock } = metadata; // Indexes handleModuleHashes(queue, manifest); handleBlocks(queue, manifest, clock); @@ -33,7 +27,7 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { } // Prevent queue from growing too large - queue_size?.set(queue.size); + prometheus.queue_size.set(queue.size); if (queue.size > config.queueLimit) await setTimeout(1000); logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`); @@ -102,11 +96,7 @@ function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) { } } -async function handleEntityChange( - queue: PQueue, - change: EntityChange, - metadata: { clock: Clock; manifest: Manifest } -) { +async function handleEntityChange(queue: PQueue, change: EntityChange, metadata: { clock: Clock; manifest: Manifest }) { const tableExists = await existsTable(change.entity); let values = getValuesInEntityChange(change); @@ -129,31 +119,26 @@ async function handleEntityChange( return deleteEntityChange(); default: - entity_chages_unsupported?.inc(); + prometheus.entity_chages_unsupported.inc(); logger.error("unsupported operation found in entityChanges: " + change.operation.toString()); return Promise.resolve(); } } -function insertEntityChange( - queue: PQueue, - table: string, - values: Record, - metadata: { id: string; clock: Clock; manifest: Manifest } -) { +function insertEntityChange(queue: PQueue, table: string, values: Record, metadata: { id: string; clock: Clock; manifest: Manifest }) { // EntityChange values["id"] = metadata.id; // Entity ID values["block_id"] = metadata.clock.id; // Block Index values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index values["chain"] = metadata.manifest.chain; // Chain Index - entity_changes_inserted?.inc(); + prometheus.entity_changes_inserted.inc(); return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" })); } // TODO: implement function function updateEntityChange(): Promise { - entity_changes_updated?.inc(); + prometheus.entity_changes_updated.inc(); return Promise.resolve(); // return client.update(); @@ -161,7 +146,7 @@ function updateEntityChange(): Promise { // TODO: implement function function deleteEntityChange(): Promise { - entity_changes_deleted?.inc(); + prometheus.entity_changes_deleted.inc(); return Promise.resolve(); // return client.delete({ values, table: change.entity }); @@ -173,7 +158,7 @@ function deleteEntityChange(): Promise { // if undefined => check EXISTS if true or false async function existsTable(table: string) { // Return cached value if known (reduces number of EXISTS queries) - if ( knownTables.has(table) ) return knownTables.get(table); + if (knownTables.has(table)) return knownTables.get(table); // Check if table EXISTS const response = await client.query({ diff --git a/src/prometheus.ts b/src/prometheus.ts index ecd4477..eaf5b10 100644 --- a/src/prometheus.ts +++ b/src/prometheus.ts @@ -18,11 +18,11 @@ export function registerGauge(name: string, help: string) { // TO-DO: Add Prometheus metrics // https://github.com/pinax-network/substreams-sink-clickhouse/issues/26 -export const sink_requests = registerCounter("sink_requests", "Total requests"); -export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests"); -export const queue_size = registerGauge("queue_size", "Amount of promises being processed"); +export const sink_requests = registerCounter("sink_requests", "Total requests")!; +export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests")!; +export const queue_size = registerGauge("queue_size", "Amount of promises being processed")!; -export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes"); -export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes"); -export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes"); -export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes") \ No newline at end of file +export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes")!; +export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes")!; +export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes")!; +export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")! \ No newline at end of file