Skip to content

Commit

Permalink
small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienR1 committed Oct 24, 2023
1 parent fdcdb04 commit c11564a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 33 deletions.
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ if (config.verbose) logger.enable();
const app = Bun.serve({
hostname: config.hostname,
port: config.port,
fetch: function fetch(req: Request) {
fetch(req: Request) {
if (req.method == "GET") return GET(req);
if (req.method == "POST") return POST(req);
if (req.method == "PUT") return PUT(req);
Expand Down
35 changes: 10 additions & 25 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ import PQueue from "p-queue";
import { client, config } from "../config.js";
import { getValuesInEntityChange } from "../entity-changes.js";
import { logger } from "../logger.js";
import {
entity_chages_unsupported,
entity_changes_deleted,
entity_changes_inserted,
entity_changes_updated,
queue_size,
} from "../prometheus.js";
import * as prometheus from "../prometheus.js";
import { Clock, Manifest, PayloadBody } from "../schemas.js";
const { setTimeout } = require("timers/promises");

Expand All @@ -21,7 +15,7 @@ const knownTables = new Map<string, boolean>();
const queue = new PQueue({ concurrency: config.queueConcurrency });

export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
const { manifest, clock } = metadata;
const { manifest, clock } = metadata;
// Indexes
handleModuleHashes(queue, manifest);
handleBlocks(queue, manifest, clock);
Expand All @@ -33,7 +27,7 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
}

// Prevent queue from growing too large
queue_size?.set(queue.size);
prometheus.queue_size.set(queue.size);
if (queue.size > config.queueLimit) await setTimeout(1000);

logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`);
Expand Down Expand Up @@ -102,11 +96,7 @@ function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
}
}

async function handleEntityChange(
queue: PQueue,
change: EntityChange,
metadata: { clock: Clock; manifest: Manifest }
) {
async function handleEntityChange(queue: PQueue, change: EntityChange, metadata: { clock: Clock; manifest: Manifest }) {
const tableExists = await existsTable(change.entity);

let values = getValuesInEntityChange(change);
Expand All @@ -129,39 +119,34 @@ async function handleEntityChange(
return deleteEntityChange();

default:
entity_chages_unsupported?.inc();
prometheus.entity_chages_unsupported.inc();
logger.error("unsupported operation found in entityChanges: " + change.operation.toString());
return Promise.resolve();
}
}

function insertEntityChange(
queue: PQueue,
table: string,
values: Record<string, unknown>,
metadata: { id: string; clock: Clock; manifest: Manifest }
) {
function insertEntityChange(queue: PQueue, table: string, values: Record<string, unknown>, metadata: { id: string; clock: Clock; manifest: Manifest }) {
// EntityChange
values["id"] = metadata.id; // Entity ID
values["block_id"] = metadata.clock.id; // Block Index
values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index
values["chain"] = metadata.manifest.chain; // Chain Index

entity_changes_inserted?.inc();
prometheus.entity_changes_inserted.inc();
return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" }));
}

// TODO: implement function
function updateEntityChange(): Promise<void> {
entity_changes_updated?.inc();
prometheus.entity_changes_updated.inc();
return Promise.resolve();

// return client.update();
}

// TODO: implement function
function deleteEntityChange(): Promise<void> {
entity_changes_deleted?.inc();
prometheus.entity_changes_deleted.inc();
return Promise.resolve();

// return client.delete({ values, table: change.entity });
Expand All @@ -173,7 +158,7 @@ function deleteEntityChange(): Promise<void> {
// if undefined => check EXISTS if true or false
async function existsTable(table: string) {
// Return cached value if known (reduces number of EXISTS queries)
if ( knownTables.has(table) ) return knownTables.get(table);
if (knownTables.has(table)) return knownTables.get(table);

// Check if table EXISTS
const response = await client.query({
Expand Down
14 changes: 7 additions & 7 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ export function registerGauge(name: string, help: string) {

// TO-DO: Add Prometheus metrics
// https://github.com/pinax-network/substreams-sink-clickhouse/issues/26
export const sink_requests = registerCounter("sink_requests", "Total requests");
export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests");
export const queue_size = registerGauge("queue_size", "Amount of promises being processed");
export const sink_requests = registerCounter("sink_requests", "Total requests")!;
export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests")!;
export const queue_size = registerGauge("queue_size", "Amount of promises being processed")!;

export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes");
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes");
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes");
export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")
export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes")!;
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes")!;
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes")!;
export const entity_chages_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")!

0 comments on commit c11564a

Please sign in to comment.