Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make public key optional #106

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
# Authentication
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook
AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication)

# HTTP Server
PORT=3000
HOSTNAME=0.0.0.0

# ClickHouse Database
HOST=http://127.0.0.1:8123
USERNAME=default
DATABASE=default
PASSWORD=

# Sink
# Webhook Authentication (Optional)
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook

# HTTP Server (Optional)
PORT=3000
HOSTNAME=0.0.0.0

# Clickhouse Sink Authentication (Optional)
# PUT endpoints are protected (uses HTTP Basic authentication)
AUTH_KEY=...

# Clickhouse Sink (Optional)
MAX_BUFFER_SIZE=1000
INSERTION_DELAY=2000
WAIT_FOR_INSERT=0
ASYNC_INSERT=1

BUFFER=buffer.db
ALLOW_UNPARSED=false
VERBOSE=true
Expand Down
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,28 @@ $ cp .env.example .env
```

```bash
# Authentication
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook
AUTH_KEY=... # PUT endpoints are protected (uses HTTP Basic authentication)

# HTTP Server
PORT=3000
HOSTNAME=0.0.0.0

# ClickHouse Database
HOST=http://127.0.0.1:8123
USERNAME=default
DATABASE=default
PASSWORD=

# Sink
# Webhook Authentication (Optional)
PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook

# HTTP Server (Optional)
PORT=3000
HOSTNAME=0.0.0.0

# Clickhouse Sink Authentication (Optional)
# PUT endpoints are protected (uses HTTP Basic authentication)
AUTH_KEY=...

# Clickhouse Sink (Optional)
MAX_BUFFER_SIZE=1000
INSERTION_DELAY=2000
WAIT_FOR_INSERT=0
ASYNC_INSERT=1

BUFFER=buffer.db
ALLOW_UNPARSED=false
VERBOSE=true
Expand Down
8 changes: 6 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env node

import { config } from "./src/config.js";
import { name, version } from "./package.json" assert { type: "json" };
import DELETE from "./src/fetch/DELETE.js";
import GET from "./src/fetch/GET.js";
import OPTIONS from "./src/fetch/OPTIONS.js";
Expand All @@ -26,5 +27,8 @@ const app = Bun.serve({
},
});

logger.info(`Server listening on http://${app.hostname}:${app.port}`);
if (config.authKey) logger.info(`Auth Key: ${config.authKey}`);
logger.info('[app]', `${name} v${version}`);
logger.info('[app]', `Server listening on http://${app.hostname}:${app.port}`);
logger.info('[app]', `Clickhouse Server ${config.host} (${config.database})`);
if (config.authKey) logger.info('[app]', `HTTP Auth Key: ${config.authKey}`);
if (config.publicKey) logger.info('[app]', `Webhook Ed25519 Public Key: ${config.publicKey}`);
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.2.1",
"version": "0.2.2",
"description": "Substreams Clickhouse sink module",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down
2 changes: 1 addition & 1 deletion src/clickhouse/createDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export async function createDatabase(database: string): Promise<Result> {
}

await client.exec({ query: `CREATE DATABASE IF NOT EXISTS "${database}"` });
logger.info(`CREATE DATABASE [${database}]`);
logger.info('[createDatabase]', `CREATE DATABASE [${database}]`);

return Ok();
}
10 changes: 5 additions & 5 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export function saveKnownEntityChanges() {
if (await store.existsTable(table)) {
await client.insert({ table, values, format: "JSONEachRow" });
} else {
logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
logger.info('[saveKnownEntityChanges]', `Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
}
}
}
Expand All @@ -92,15 +92,15 @@ function batchSizeLimitReached() {
}

function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) {
logger.info(`handleSinkRequest | entityChanges=${entityChanges.length}`);
logger.info('[handleEntityChanges]', `entityChanges=${entityChanges.length}`);
for (const change of entityChanges) {
const values = getValuesInEntityChange(change);
handleChange(change.entity, values, change.operation, { ...metadata, id: change.id });
}
}

function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) {
logger.info(`handleSinkRequest | tableChanges=${tableChanges.length}`);
logger.info('[handleDatabaseChanges]', `tableChanges=${tableChanges.length}`);
for (const change of tableChanges) {
const values = getValuesInTableChange(change);
handleChange(change.table, values, change.operation, { ...metadata, id: "" });
Expand Down Expand Up @@ -133,7 +133,7 @@ async function handleChange(
table = "unparsed_json";
}

logger.info(["handleChange", table, operation, metadata.id, clock, manifest, data].join(" | "));
logger.info('[handleChange]', [table, operation, metadata.id, clock, manifest, data].join(" | "));

switch (operation) {
case "OPERATION_CREATE":
Expand All @@ -157,7 +157,7 @@ async function handleChange(

default:
prometheus.entity_changes_unsupported.inc();
logger.error("unsupported operation found in entityChanges: " + operation.toString());
logger.error('[handleChange]', "unsupported operation found in entityChanges: " + operation.toString());
return Promise.resolve();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ class ClickhouseStore {
const exists = data[0]?.result === 1;
this.knownTables.set(table, exists);

logger.info(`EXISTS [${table}=${exists}]`);
logger.info('[existsTable]', `EXISTS [${table}=${exists}]`);
return exists;
}

public reset() {
this.chainsPromise = null;
this.moduleHashesPromises = null;
this.knownTables.clear();
logger.info("Cache has been cleared");
logger.info('[reset]', "Cache has been cleared");
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/clickhouse/table-initialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import tables from "./tables/index.js";
export async function initializeDefaultTables(): Promise<Result> {
const promiseResults = await Promise.allSettled(
tables.map(([table, query]) => {
logger.info(`CREATE TABLE [${table}]`);
logger.info('[initializeDefaultTables]', `CREATE TABLE [${table}]`);
return client.command({ query });
})
);
Expand Down Expand Up @@ -41,12 +41,12 @@ const alterations = (tableName: string) => {

export async function executeCreateStatements(statements: string[]): Promise<Result<Array<string>>> {
const executedStatements = [];
logger.info(`Executing ${statements.length} statement(s)`);
logger.info('[executeCreateStatements]', `Executing ${statements.length} statement(s)`);

try {
for (const statement of statements) {
const tableName = getTableName(statement);
logger.info(`Executing '${tableName}'`);
logger.info('[executeCreateStatements]', `Executing '${tableName}'`);

if (!isCreateTableStatement(statement)) {
executedStatements.push(statement);
Expand All @@ -63,10 +63,10 @@ export async function executeCreateStatements(statements: string[]): Promise<Res
}
}
} catch (err) {
logger.error("Could not execute the statements", "Request: " + executedStatements, err);
logger.error('[executeCreateStatements]', "Could not execute the statements", "Request: " + executedStatements, err);
return Err(new Error(JSON.stringify(err)));
}

logger.info("Complete.");
logger.info('[executeCreateStatements]', "Complete.");
return Ok(executedStatements);
}
10 changes: 7 additions & 3 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { handleSinkRequest } from "../clickhouse/handleSinkRequest.js";
import { store } from "../clickhouse/stores.js";
import { config } from "../config.js";
import { logger } from "../logger.js";
import * as prometheus from "../prometheus.js";
import { BodySchema } from "../schemas.js";
Expand All @@ -20,22 +21,25 @@ export default async function (req: Request) {

// validate Ed25519 signature
const text = await req.text();
const signatureResult = await signatureEd25519(req, text);
if (!signatureResult.success) return signatureResult.error;
if ( config.publicKey ) {
const signatureResult = await signatureEd25519(req, text);
if (!signatureResult.success) return signatureResult.error;
}

// parse POST body payload
try {
prometheus.requests.inc();
const body = BodySchema.parse(JSON.parse(text));

if ("message" in body) {
logger.info('[POST]', text);
if (body.message === "PING") return toText("OK");
return toText("invalid body", 400);
}

return handleSinkRequest(body);
} catch (err) {
logger.error(err);
logger.error('[POST]', err);
prometheus.request_errors?.inc();
return toText("invalid request", 400);
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export async function blocks(): Promise<Response> {

return toJSON(dto);
} catch (err) {
logger.error(err);
logger.error('[blocks]', err);
return InternalServerError;
}
}
2 changes: 1 addition & 1 deletion src/fetch/cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function findLatestCursor(req: Request): Promise<Response> {

return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400);
} catch (err) {
logger.error(err);
logger.error('[findLatestCursor]', err);
}
return BadRequest;
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default async function () {
const result = await step();

if (!result.success) {
logger.error(`/init | ${failureMessage} | ${result.error}`);
logger.error('[init]', `${failureMessage} | ${result.error}`);
return BadRequest;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { toText } from "./cors.js";

export function handlePause(targetValue: boolean): Response {
store.paused = targetValue;
logger.info("Sink is now paused: " + store.paused);
logger.info('[handlePause]', "Sink is now paused: " + store.paused);
return toText("OK");
}
2 changes: 1 addition & 1 deletion src/fetch/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export async function query(req: Request): Promise<Response> {

return toJSON(data);
} catch (err) {
logger.error(err);
logger.error('[query]', err);
return BadRequest;
}
}
4 changes: 2 additions & 2 deletions src/fetch/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export async function handleSchemaRequest(req: Request, type: "sql" | "graphql")
statements = TableTranslator.translate(schemaResult.payload, clickhouseBuilder);
}

logger.info(`Found ${statements.length} statement(s)`);
logger.info('[handleSchemaRequest]', `Found ${statements.length} statement(s)`);

const executedSchemas = await executeCreateStatements(statements);
if (!executedSchemas.success) {
Expand Down Expand Up @@ -62,7 +62,7 @@ async function getSchemaFromRequest(req: Request): Promise<Result<string, Respon

return Ok(TableInitSchema.parse(body));
} catch (e) {
logger.error(e);
logger.error('[getSchemaFromRequest]', e);
}

return Err(BadRequest);
Expand Down
9 changes: 4 additions & 5 deletions src/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import { logger } from "./logger.js";
export async function resume() {
const pingResult = await ping();
if (!pingResult.success) {
logger.error("Resume failed | Error: " + pingResult.error.message);
logger.error("[resume]", "Error: " + pingResult.error.message);
return;
}

logger.info("Writing unsinked data to ClickHouse");
logger.info("[resume]", "writing unsinked data to ClickHouse...");
const saveResult = await saveKnownEntityChanges();
if (!saveResult.success) {
logger.error("Resume failed | Error: " + saveResult.error.message);
logger.error("[resume]", "Error: " + saveResult.error.message);
return;
}

logger.info("Resume completed.");
logger.info("[resume]", "completed");
}
6 changes: 3 additions & 3 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export const positiveNumber = z.coerce.number().pipe(z.number().positive());
export const oneOrZero = z.coerce.number().pipe(z.literal(0).or(z.literal(1)));

export const ConfigSchema = z.object({
publicKey: z
.string()
publicKey: z.optional(
z.string()
.transform((str) => str.split(","))
.refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set"),
.refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set")),
authKey: z.optional(z.string().transform((str) => str.replaceAll("\\$", "$"))),
port: positiveNumber,
verbose: boolean,
Expand Down
1 change: 1 addition & 0 deletions src/webhook/signatureEd25519.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Err, Ok, Result } from "../result.js";
import { verify } from "substreams-sink-webhook/auth";

export async function signatureEd25519(req: Request, body: string): Promise<Result<undefined, Response>> {
if ( !config.publicKey) return Ok();
const signature = req.headers.get("x-signature-ed25519");
const timestamp = Number(req.headers.get("x-signature-timestamp"));

Expand Down
Loading