From d7042882f4c431885a73f002026176051c66e146 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Mon, 23 Oct 2023 16:06:55 -0400 Subject: [PATCH] added entity_changes type metrics --- src/clickhouse/handleSinkRequest.ts | 35 ++++++++++++++++++++++++----- src/prometheus.ts | 6 ++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 1e580b1..97021d9 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -3,7 +3,13 @@ import PQueue from "p-queue"; import { client, config } from "../config.js"; import { getValuesInEntityChange } from "../entity-changes.js"; import { logger } from "../logger.js"; -import { entity_changes, queue_size } from "../prometheus.js"; +import { + entity_chages_unsupported, + entity_changes_deleted, + entity_changes_inserted, + entity_changes_updated, + queue_size, +} from "../prometheus.js"; import { Clock, Manifest, PayloadBody } from "../schemas.js"; const { setTimeout } = require("timers/promises"); @@ -98,13 +104,14 @@ async function handleEntityChange( case "OPERATION_CREATE": return insertEntityChange(queue, table, values, { ...metadata, id: change.id }); - // case "OPERATION_UPDATE": - // return client.update(); + case "OPERATION_UPDATE": + return updateEntityChange(); - // case "OPERATION_DELETE": - // return client.delete({ values, table: change.entity }); + case "OPERATION_DELETE": + return deleteEntityChange(); default: + entity_chages_unsupported?.inc(); logger.error("unsupported operation found in entityChanges: " + change.operation.toString()); return Promise.resolve(); } @@ -122,10 +129,26 @@ function insertEntityChange( values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index values["chain"] = metadata.manifest.chain; // Chain Index - entity_changes?.inc(); + entity_changes_inserted?.inc(); return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" })); } +// TODO: implement function +function updateEntityChange(): Promise { + entity_changes_updated?.inc(); + return Promise.resolve(); + + // return client.update(); +} + +// TODO: implement function +function deleteEntityChange(): Promise { + entity_changes_deleted?.inc(); + return Promise.resolve(); + + // return client.delete({ values, table: change.entity }); +} + // TO-DO: this function won't work in a serverless function environment or running with multiple replicas // Cannot depend on memory to know if table exists or not async function existsTable(table: string) { diff --git a/src/prometheus.ts b/src/prometheus.ts index 07936de..ecd4477 100644 --- a/src/prometheus.ts +++ b/src/prometheus.ts @@ -21,4 +21,8 @@ export function registerGauge(name: string, help: string) { export const sink_requests = registerCounter("sink_requests", "Total requests"); export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests"); export const queue_size = registerGauge("queue_size", "Amount of promises being processed"); -export const entity_changes = registerCounter("entity_changes", "Total entity changes"); + +export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes"); +export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes"); +export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes"); +export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes") \ No newline at end of file