Skip to content

Commit

Permalink
added entity_changes type metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienR1 committed Oct 23, 2023
1 parent b17579f commit d704288
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
35 changes: 29 additions & 6 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ import PQueue from "p-queue";
import { client, config } from "../config.js";
import { getValuesInEntityChange } from "../entity-changes.js";
import { logger } from "../logger.js";
import { entity_changes, queue_size } from "../prometheus.js";
import {
entity_chages_unsupported,
entity_changes_deleted,
entity_changes_inserted,
entity_changes_updated,
queue_size,
} from "../prometheus.js";
import { Clock, Manifest, PayloadBody } from "../schemas.js";
const { setTimeout } = require("timers/promises");

Expand Down Expand Up @@ -98,13 +104,14 @@ async function handleEntityChange(
case "OPERATION_CREATE":
return insertEntityChange(queue, table, values, { ...metadata, id: change.id });

// case "OPERATION_UPDATE":
// return client.update();
case "OPERATION_UPDATE":
return updateEntityChange();

// case "OPERATION_DELETE":
// return client.delete({ values, table: change.entity });
case "OPERATION_DELETE":
return deleteEntityChange();

default:
entity_chages_unsupported?.inc();
logger.error("unsupported operation found in entityChanges: " + change.operation.toString());
return Promise.resolve();
}
Expand All @@ -122,10 +129,26 @@ function insertEntityChange(
values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index
values["chain"] = metadata.manifest.chain; // Chain Index

entity_changes?.inc();
entity_changes_inserted?.inc();
return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" }));
}

// TODO: implement function
function updateEntityChange(): Promise<void> {
entity_changes_updated?.inc();
return Promise.resolve();

// return client.update();
}

// TODO: implement function
function deleteEntityChange(): Promise<void> {
entity_changes_deleted?.inc();
return Promise.resolve();

// return client.delete({ values, table: change.entity });
}

// TO-DO: this function won't work in a serverless function environment or running with multiple replicas
// Cannot depend on memory to know if table exists or not
async function existsTable(table: string) {
Expand Down
6 changes: 5 additions & 1 deletion src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ export function registerGauge(name: string, help: string) {
export const sink_requests = registerCounter("sink_requests", "Total requests");
export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests");
export const queue_size = registerGauge("queue_size", "Amount of promises being processed");
export const entity_changes = registerCounter("entity_changes", "Total entity changes");

export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes");
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes");
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes");
export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")

0 comments on commit d704288

Please sign in to comment.