Skip to content

Commit

Permalink
make public key optional
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 15, 2024
1 parent faf7d32 commit 76c4e92
Show file tree
Hide file tree
Showing 18 changed files with 66 additions and 54 deletions.
22 changes: 12 additions & 10 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
# Authentication
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook
AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication)

# HTTP Server
PORT=3000
HOSTNAME=0.0.0.0

# ClickHouse Database
HOST=http://127.0.0.1:8123
USERNAME=default
DATABASE=default
PASSWORD=

# Sink
# Webhook Authentication (Optional)
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook

# HTTP Server (Optional)
PORT=3000
HOSTNAME=0.0.0.0

# Clickhouse Sink Authentication (Optional)
# PUT endpoints are protected (uses HTTP Basic authentication)
AUTH_KEY=...

# Clickhouse Sink (Optional)
MAX_BUFFER_SIZE=1000
INSERTION_DELAY=2000
WAIT_FOR_INSERT=0
ASYNC_INSERT=1

BUFFER=buffer.db
ALLOW_UNPARSED=false
VERBOSE=true
Expand Down
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,28 @@ $ cp .env.example .env
```

```bash
# Authentication
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook
AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication)

# HTTP Server
PORT=3000
HOSTNAME=0.0.0.0

# ClickHouse Database
HOST=http://127.0.0.1:8123
USERNAME=default
DATABASE=default
PASSWORD=

# Sink
# Webhook Authentication (Optional)
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook

# HTTP Server (Optional)
PORT=3000
HOSTNAME=0.0.0.0

# Clickhouse Sink Authentication (Optional)
# PUT endpoints are protected (uses HTTP Basic authentication)
AUTH_KEY=...

# Clickhouse Sink (Optional)
MAX_BUFFER_SIZE=1000
INSERTION_DELAY=2000
WAIT_FOR_INSERT=0
ASYNC_INSERT=1

BUFFER=buffer.db
ALLOW_UNPARSED=false
VERBOSE=true
Expand Down
8 changes: 6 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env node

import { config } from "./src/config.js";
import { name, version } from "./package.json" assert { type: "json" };
import DELETE from "./src/fetch/DELETE.js";
import GET from "./src/fetch/GET.js";
import OPTIONS from "./src/fetch/OPTIONS.js";
Expand All @@ -26,5 +27,8 @@ const app = Bun.serve({
},
});

logger.info(`Server listening on http://${app.hostname}:${app.port}`);
if (config.authKey) logger.info(`Auth Key: ${config.authKey}`);
logger.info('[app]', `${name} v${version}`);
logger.info('[app]', `Server listening on http://${app.hostname}:${app.port}`);
logger.info('[app]', `Clickhouse Server ${config.host} (${config.database})`);
if (config.authKey) logger.info('[app]', `HTTP Auth Key: ${config.authKey}`);
if (config.publicKey) logger.info('[app]', `Webhook Ed25519 Public Key: ${config.publicKey}`);
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.2.1",
"version": "0.2.2",
"description": "Substreams Clickhouse sink module",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down
2 changes: 1 addition & 1 deletion src/clickhouse/createDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export async function createDatabase(database: string): Promise<Result> {
}

await client.exec({ query: `CREATE DATABASE IF NOT EXISTS "${database}"` });
logger.info(`CREATE DATABASE [${database}]`);
logger.info('[createDatabase]', `CREATE DATABASE [${database}]`);

return Ok();
}
10 changes: 5 additions & 5 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export function saveKnownEntityChanges() {
if (await store.existsTable(table)) {
await client.insert({ table, values, format: "JSONEachRow" });
} else {
logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
logger.info('[saveKnownEntityChanges]', `Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
}
}
}
Expand All @@ -92,15 +92,15 @@ function batchSizeLimitReached() {
}

function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) {
logger.info(`handleSinkRequest | entityChanges=${entityChanges.length}`);
logger.info('[handleEntityChanges]', `entityChanges=${entityChanges.length}`);
for (const change of entityChanges) {
const values = getValuesInEntityChange(change);
handleChange(change.entity, values, change.operation, { ...metadata, id: change.id });
}
}

function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) {
logger.info(`handleSinkRequest | tableChanges=${tableChanges.length}`);
logger.info('[handleDatabaseChanges]', `tableChanges=${tableChanges.length}`);
for (const change of tableChanges) {
const values = getValuesInTableChange(change);
handleChange(change.table, values, change.operation, { ...metadata, id: "" });
Expand Down Expand Up @@ -133,7 +133,7 @@ async function handleChange(
table = "unparsed_json";
}

logger.info(["handleChange", table, operation, metadata.id, clock, manifest, data].join(" | "));
logger.info('[handleChange]', [table, operation, metadata.id, clock, manifest, data].join(" | "));

switch (operation) {
case "OPERATION_CREATE":
Expand All @@ -157,7 +157,7 @@ async function handleChange(

default:
prometheus.entity_changes_unsupported.inc();
logger.error("unsupported operation found in entityChanges: " + operation.toString());
logger.error('[handleChange]', "unsupported operation found in entityChanges: " + operation.toString());
return Promise.resolve();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ class ClickhouseStore {
const exists = data[0]?.result === 1;
this.knownTables.set(table, exists);

logger.info(`EXISTS [${table}=${exists}]`);
logger.info('[existsTable]', `EXISTS [${table}=${exists}]`);
return exists;
}

public reset() {
this.chainsPromise = null;
this.moduleHashesPromises = null;
this.knownTables.clear();
logger.info("Cache has been cleared");
logger.info('[reset]', "Cache has been cleared");
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/clickhouse/table-initialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import tables from "./tables/index.js";
export async function initializeDefaultTables(): Promise<Result> {
const promiseResults = await Promise.allSettled(
tables.map(([table, query]) => {
logger.info(`CREATE TABLE [${table}]`);
logger.info('[initializeDefaultTables]', `CREATE TABLE [${table}]`);
return client.command({ query });
})
);
Expand Down Expand Up @@ -41,12 +41,12 @@ const alterations = (tableName: string) => {

export async function executeCreateStatements(statements: string[]): Promise<Result<Array<string>>> {
const executedStatements = [];
logger.info(`Executing ${statements.length} statement(s)`);
logger.info('[executeCreateStatements]', `Executing ${statements.length} statement(s)`);

try {
for (const statement of statements) {
const tableName = getTableName(statement);
logger.info(`Executing '${tableName}'`);
logger.info('[executeCreateStatements]', `Executing '${tableName}'`);

if (!isCreateTableStatement(statement)) {
executedStatements.push(statement);
Expand All @@ -63,10 +63,10 @@ export async function executeCreateStatements(statements: string[]): Promise<Res
}
}
} catch (err) {
logger.error("Could not execute the statements", "Request: " + executedStatements, err);
logger.error('[executeCreateStatements]', "Could not execute the statements", "Request: " + executedStatements, err);
return Err(new Error(JSON.stringify(err)));
}

logger.info("Complete.");
logger.info('[executeCreateStatements]', "Complete.");
return Ok(executedStatements);
}
10 changes: 7 additions & 3 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { handleSinkRequest } from "../clickhouse/handleSinkRequest.js";
import { store } from "../clickhouse/stores.js";
import { config } from "../config.js";
import { logger } from "../logger.js";
import * as prometheus from "../prometheus.js";
import { BodySchema } from "../schemas.js";
Expand All @@ -20,22 +21,25 @@ export default async function (req: Request) {

// validate Ed25519 signature
const text = await req.text();
const signatureResult = await signatureEd25519(req, text);
if (!signatureResult.success) return signatureResult.error;
if ( config.publicKey ) {
const signatureResult = await signatureEd25519(req, text);
if (!signatureResult.success) return signatureResult.error;
}

// parse POST body payload
try {
prometheus.requests.inc();
const body = BodySchema.parse(JSON.parse(text));

if ("message" in body) {
logger.info('[POST]', text);
if (body.message === "PING") return toText("OK");
return toText("invalid body", 400);
}

return handleSinkRequest(body);
} catch (err) {
logger.error(err);
logger.error('[POST]', err);
prometheus.request_errors?.inc();
return toText("invalid request", 400);
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export async function blocks(): Promise<Response> {

return toJSON(dto);
} catch (err) {
logger.error(err);
logger.error('[blocks]', err);
return InternalServerError;
}
}
2 changes: 1 addition & 1 deletion src/fetch/cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function findLatestCursor(req: Request): Promise<Response> {

return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400);
} catch (err) {
logger.error(err);
logger.error('[findLatestCursor]', err);
}
return BadRequest;
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default async function () {
const result = await step();

if (!result.success) {
logger.error(`/init | ${failureMessage} | ${result.error}`);
logger.error('[init]', `${failureMessage} | ${result.error}`);
return BadRequest;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { toText } from "./cors.js";

export function handlePause(targetValue: boolean): Response {
store.paused = targetValue;
logger.info("Sink is now paused: " + store.paused);
logger.info('[handlePause]', "Sink is now paused: " + store.paused);
return toText("OK");
}
2 changes: 1 addition & 1 deletion src/fetch/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export async function query(req: Request): Promise<Response> {

return toJSON(data);
} catch (err) {
logger.error(err);
logger.error('[query]', err);
return BadRequest;
}
}
4 changes: 2 additions & 2 deletions src/fetch/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export async function handleSchemaRequest(req: Request, type: "sql" | "graphql")
statements = TableTranslator.translate(schemaResult.payload, clickhouseBuilder);
}

logger.info(`Found ${statements.length} statement(s)`);
logger.info('[handleSchemaRequest]', `Found ${statements.length} statement(s)`);

const executedSchemas = await executeCreateStatements(statements);
if (!executedSchemas.success) {
Expand Down Expand Up @@ -62,7 +62,7 @@ async function getSchemaFromRequest(req: Request): Promise<Result<string, Respon

return Ok(TableInitSchema.parse(body));
} catch (e) {
logger.error(e);
logger.error('[getSchemaFromRequest]', e);
}

return Err(BadRequest);
Expand Down
9 changes: 4 additions & 5 deletions src/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import { logger } from "./logger.js";
export async function resume() {
const pingResult = await ping();
if (!pingResult.success) {
logger.error("Resume failed | Error: " + pingResult.error.message);
logger.error("[resume]", "Error: " + pingResult.error.message);
return;
}

logger.info("Writing unsinked data to ClickHouse");
logger.info("[resume]", "writing unsinked data to ClickHouse...");
const saveResult = await saveKnownEntityChanges();
if (!saveResult.success) {
logger.error("Resume failed | Error: " + saveResult.error.message);
logger.error("[resume]", "Error: " + saveResult.error.message);
return;
}

logger.info("Resume completed.");
logger.info("[resume]", "completed");
}
6 changes: 3 additions & 3 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export const positiveNumber = z.coerce.number().pipe(z.number().positive());
export const oneOrZero = z.coerce.number().pipe(z.literal(0).or(z.literal(1)));

export const ConfigSchema = z.object({
publicKey: z
.string()
publicKey: z.optional(
z.string()
.transform((str) => str.split(","))
.refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set"),
.refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set")),
authKey: z.optional(z.string().transform((str) => str.replaceAll("\\$", "$"))),
port: positiveNumber,
verbose: boolean,
Expand Down
1 change: 1 addition & 0 deletions src/webhook/signatureEd25519.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Err, Ok, Result } from "../result.js";
import { verify } from "substreams-sink-webhook/auth";

export async function signatureEd25519(req: Request, body: string): Promise<Result<undefined, Response>> {
if ( !config.publicKey) return Ok();
const signature = req.headers.get("x-signature-ed25519");
const timestamp = Number(req.headers.get("x-signature-timestamp"));

Expand Down

0 comments on commit 76c4e92

Please sign in to comment.