diff --git a/.env.example b/.env.example index e6aa074..60e98f6 100644 --- a/.env.example +++ b/.env.example @@ -1,23 +1,25 @@ -# Authentication -PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook -AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication) - -# HTTP Server -PORT=3000 -HOSTNAME=0.0.0.0 - # ClickHouse Database HOST=http://127.0.0.1:8123 USERNAME=default DATABASE=default PASSWORD= -# Sink +# Webhook Authentication (Optional) +PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook + +# HTTP Server (Optional) +PORT=3000 +HOSTNAME=0.0.0.0 + +# Clickhouse Sink Authentication (Optional) +# PUT endpoints are protected (uses HTTP Basic authentication) +AUTH_KEY=... + +# Clickhouse Sink (Optional) MAX_BUFFER_SIZE=1000 INSERTION_DELAY=2000 WAIT_FOR_INSERT=0 ASYNC_INSERT=1 - BUFFER=buffer.db ALLOW_UNPARSED=false VERBOSE=true diff --git a/README.md b/README.md index 4d51e25..e3b64e3 100644 --- a/README.md +++ b/README.md @@ -119,26 +119,28 @@ $ cp .env.example .env ``` ```bash -# Authentication -PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook -AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication) - -# HTTP Server -PORT=3000 -HOSTNAME=0.0.0.0 - # ClickHouse Database HOST=http://127.0.0.1:8123 USERNAME=default DATABASE=default PASSWORD= -# Sink +# Webhook Authentication (Optional) +PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook + +# HTTP Server (Optional) +PORT=3000 +HOSTNAME=0.0.0.0 + +# Clickhouse Sink Authentication (Optional) +# PUT endpoints are protected (uses HTTP Basic authentication) +AUTH_KEY=... + +# Clickhouse Sink (Optional) MAX_BUFFER_SIZE=1000 INSERTION_DELAY=2000 WAIT_FOR_INSERT=0 ASYNC_INSERT=1 - BUFFER=buffer.db ALLOW_UNPARSED=false VERBOSE=true diff --git a/index.ts b/index.ts index 1878510..5f678e2 100644 --- a/index.ts +++ b/index.ts @@ -1,6 +1,7 @@ #!/usr/bin/env node import { config } from "./src/config.js"; +import { name, version } from "./package.json" assert { type: "json" }; import DELETE from "./src/fetch/DELETE.js"; import GET from "./src/fetch/GET.js"; import OPTIONS from "./src/fetch/OPTIONS.js"; @@ -26,5 +27,8 @@ const app = Bun.serve({ }, }); -logger.info(`Server listening on http://${app.hostname}:${app.port}`); -if (config.authKey) logger.info(`Auth Key: ${config.authKey}`); +logger.info('[app]', `${name} v${version}`); +logger.info('[app]', `Server listening on http://${app.hostname}:${app.port}`); +logger.info('[app]', `Clickhouse Server ${config.host} (${config.database})`); +if (config.authKey) logger.info('[app]', `HTTP Auth Key: ${config.authKey}`); +if (config.publicKey) logger.info('[app]', `Webhook Ed25519 Public Key: ${config.publicKey}`); diff --git a/package.json b/package.json index 8411f9a..5e37ee3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "substreams-sink-clickhouse", - "version": "0.2.1", + "version": "0.2.2", "description": "Substreams Clickhouse sink module", "type": "module", "homepage": "https://github.com/pinax-network/substreams-sink-clickhouse", diff --git a/src/clickhouse/createDatabase.ts b/src/clickhouse/createDatabase.ts index b1ef96e..a786d6f 100644 --- a/src/clickhouse/createDatabase.ts +++ b/src/clickhouse/createDatabase.ts @@ -8,7 +8,7 @@ export async function createDatabase(database: string): Promise { } await client.exec({ query: `CREATE DATABASE IF NOT EXISTS "${database}"` }); - logger.info(`CREATE DATABASE [${database}]`); + logger.info('[createDatabase]', `CREATE DATABASE [${database}]`); return Ok(); } diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index ab30780..3f57bf9 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -80,7 +80,7 @@ export function saveKnownEntityChanges() { if (await store.existsTable(table)) { await client.insert({ table, values, format: "JSONEachRow" }); } else { - logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`); + logger.info('[saveKnownEntityChanges]', `Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`); } } } @@ -92,7 +92,7 @@ function batchSizeLimitReached() { } function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) { - logger.info(`handleSinkRequest | entityChanges=${entityChanges.length}`); + logger.info('[handleEntityChanges]', `entityChanges=${entityChanges.length}`); for (const change of entityChanges) { const values = getValuesInEntityChange(change); handleChange(change.entity, values, change.operation, { ...metadata, id: change.id }); @@ -100,7 +100,7 @@ function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) } function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) { - logger.info(`handleSinkRequest | tableChanges=${tableChanges.length}`); + logger.info('[handleDatabaseChanges]', `tableChanges=${tableChanges.length}`); for (const change of tableChanges) { const values = getValuesInTableChange(change); handleChange(change.table, values, change.operation, { ...metadata, id: "" }); @@ -133,7 +133,7 @@ async function handleChange( table = "unparsed_json"; } - logger.info(["handleChange", table, operation, metadata.id, clock, manifest, data].join(" | ")); + logger.info('[handleChange]', [table, operation, metadata.id, clock, manifest, data].join(" | ")); switch (operation) { case "OPERATION_CREATE": @@ -157,7 +157,7 @@ async function handleChange( default: prometheus.entity_changes_unsupported.inc(); - logger.error("unsupported operation found in entityChanges: " + operation.toString()); + logger.error('[handleChange]', "unsupported operation found in entityChanges: " + operation.toString()); return Promise.resolve(); } } diff --git a/src/clickhouse/stores.ts b/src/clickhouse/stores.ts index 38e9c41..65d10ef 100644 --- a/src/clickhouse/stores.ts +++ b/src/clickhouse/stores.ts @@ -54,7 +54,7 @@ class ClickhouseStore { const exists = data[0]?.result === 1; this.knownTables.set(table, exists); - logger.info(`EXISTS [${table}=${exists}]`); + logger.info('[existsTable]', `EXISTS [${table}=${exists}]`); return exists; } @@ -62,7 +62,7 @@ class ClickhouseStore { this.chainsPromise = null; this.moduleHashesPromises = null; this.knownTables.clear(); - logger.info("Cache has been cleared"); + logger.info('[reset]', "Cache has been cleared"); } } diff --git a/src/clickhouse/table-initialization.ts b/src/clickhouse/table-initialization.ts index 022e7d3..1c80d98 100644 --- a/src/clickhouse/table-initialization.ts +++ b/src/clickhouse/table-initialization.ts @@ -7,7 +7,7 @@ import tables from "./tables/index.js"; export async function initializeDefaultTables(): Promise { const promiseResults = await Promise.allSettled( tables.map(([table, query]) => { - logger.info(`CREATE TABLE [${table}]`); + logger.info('[initializeDefaultTables]', `CREATE TABLE [${table}]`); return client.command({ query }); }) ); @@ -41,12 +41,12 @@ const alterations = (tableName: string) => { export async function executeCreateStatements(statements: string[]): Promise>> { const executedStatements = []; - logger.info(`Executing ${statements.length} statement(s)`); + logger.info('[executeCreateStatements]', `Executing ${statements.length} statement(s)`); try { for (const statement of statements) { const tableName = getTableName(statement); - logger.info(`Executing '${tableName}'`); + logger.info('[executeCreateStatements]', `Executing '${tableName}'`); if (!isCreateTableStatement(statement)) { executedStatements.push(statement); @@ -63,10 +63,10 @@ export async function executeCreateStatements(statements: string[]): Promise { return toJSON(dto); } catch (err) { - logger.error(err); + logger.error('[blocks]', err); return InternalServerError; } } diff --git a/src/fetch/cursor.ts b/src/fetch/cursor.ts index b725f57..08ba8fa 100644 --- a/src/fetch/cursor.ts +++ b/src/fetch/cursor.ts @@ -28,7 +28,7 @@ export async function findLatestCursor(req: Request): Promise { return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400); } catch (err) { - logger.error(err); + logger.error('[findLatestCursor]', err); } return BadRequest; } diff --git a/src/fetch/init.ts b/src/fetch/init.ts index 03759c6..4a027d6 100644 --- a/src/fetch/init.ts +++ b/src/fetch/init.ts @@ -16,7 +16,7 @@ export default async function () { const result = await step(); if (!result.success) { - logger.error(`/init | ${failureMessage} | ${result.error}`); + logger.error('[init]', `${failureMessage} | ${result.error}`); return BadRequest; } } diff --git a/src/fetch/pause.ts b/src/fetch/pause.ts index bac6366..91e9960 100644 --- a/src/fetch/pause.ts +++ b/src/fetch/pause.ts @@ -4,6 +4,6 @@ import { toText } from "./cors.js"; export function handlePause(targetValue: boolean): Response { store.paused = targetValue; - logger.info("Sink is now paused: " + store.paused); + logger.info('[handlePause]', "Sink is now paused: " + store.paused); return toText("OK"); } diff --git a/src/fetch/query.ts b/src/fetch/query.ts index a6f91b4..9a73c8a 100644 --- a/src/fetch/query.ts +++ b/src/fetch/query.ts @@ -10,7 +10,7 @@ export async function query(req: Request): Promise { return toJSON(data); } catch (err) { - logger.error(err); + logger.error('[query]', err); return BadRequest; } } diff --git a/src/fetch/schema.ts b/src/fetch/schema.ts index c205d3d..2d0bd87 100644 --- a/src/fetch/schema.ts +++ b/src/fetch/schema.ts @@ -23,7 +23,7 @@ export async function handleSchemaRequest(req: Request, type: "sql" | "graphql") statements = TableTranslator.translate(schemaResult.payload, clickhouseBuilder); } - logger.info(`Found ${statements.length} statement(s)`); + logger.info('[handleSchemaRequest]', `Found ${statements.length} statement(s)`); const executedSchemas = await executeCreateStatements(statements); if (!executedSchemas.success) { @@ -62,7 +62,7 @@ async function getSchemaFromRequest(req: Request): Promise str.split(",")) - .refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set"), + .refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set")), authKey: z.optional(z.string().transform((str) => str.replaceAll("\\$", "$"))), port: positiveNumber, verbose: boolean, diff --git a/src/webhook/signatureEd25519.ts b/src/webhook/signatureEd25519.ts index 60e5c77..37a9465 100644 --- a/src/webhook/signatureEd25519.ts +++ b/src/webhook/signatureEd25519.ts @@ -4,6 +4,7 @@ import { Err, Ok, Result } from "../result.js"; import { verify } from "substreams-sink-webhook/auth"; export async function signatureEd25519(req: Request, body: string): Promise> { + if ( !config.publicKey) return Ok(); const signature = req.headers.get("x-signature-ed25519"); const timestamp = Number(req.headers.get("x-signature-timestamp"));