Skip to content

Commit

Permalink
added initial metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienR1 committed Oct 23, 2023
1 parent d58cabd commit b17579f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
11 changes: 8 additions & 3 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { EntityChange } from "@substreams/sink-entity-changes/zod";
import PQueue from "p-queue";
import { client, config } from "../config.js";
import { getValuesInEntityChange } from "../entity-changes.js";
import { logger } from "../logger.js";
import { entity_changes, queue_size } from "../prometheus.js";
import { Clock, Manifest, PayloadBody } from "../schemas.js";
import { client, config } from "../config.js";
const { setTimeout } = require("timers/promises");

const knownModuleHashes = new Set<string>();
Expand All @@ -18,12 +19,15 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
handleEntityChange(queue, change, metadata);
}

queue_size?.set(queue.size);
if (queue.size > config.queueLimit) await setTimeout(1000);

// TO-DO: Logging can be improved
logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`);
logger.info(
`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`
);
return new Response("OK");
};
}

// Manifest index
function handleManifest(queue: PQueue, manifest: Manifest) {
Expand Down Expand Up @@ -118,6 +122,7 @@ function insertEntityChange(
values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index
values["chain"] = metadata.manifest.chain; // Chain Index

entity_changes?.inc();
return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" }));
}

Expand Down
27 changes: 18 additions & 9 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import { BodySchema } from "../schemas.js";
import { handleSinkRequest } from "../clickhouse/handleSinkRequest.js";
import { sink_request_errors, sink_requests } from "../prometheus.js";
import { BodySchema } from "../schemas.js";
import signatureEd25519 from "../webhook/signatureEd25519.js";

export default async function (req: Request) {
// validate Ed25519 signature
const text = await req.text();
const signatureError = await signatureEd25519(req, text);
if ( signatureError ) return signatureError;
// validate Ed25519 signature
const text = await req.text();
const signatureError = await signatureEd25519(req, text);
if (signatureError) return signatureError;

// parse POST body payload
// parse POST body payload
try {
const body = BodySchema.parse(JSON.parse(text));
if ( "message" in body ) {
if (body.message === "PING") return new Response("OK");
return new Response("invalid body", { status: 400 });

if ("message" in body) {
if (body.message === "PING") return new Response("OK");
return new Response("invalid body", { status: 400 });
}

return handleSinkRequest(body);
} catch {
sink_request_errors?.inc();
} finally {
sink_requests?.inc();
}
}
5 changes: 4 additions & 1 deletion src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ export function registerGauge(name: string, help: string) {

// TO-DO: Add Prometheus metrics
// https://github.com/pinax-network/substreams-sink-clickhouse/issues/26
export const requests = registerCounter("requests", "Total requests");
export const sink_requests = registerCounter("sink_requests", "Total requests");
export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests");
export const queue_size = registerGauge("queue_size", "Amount of promises being processed");
export const entity_changes = registerCounter("entity_changes", "Total entity changes");

0 comments on commit b17579f

Please sign in to comment.