diff --git a/.env.example b/.env.example index e57246a..8012d9a 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,7 @@ # Webhook SECRET_KEY=... PUBLIC_KEY=... -URL=http://127.0.0.1:3000 +WEBHOOK_URL=http://127.0.0.1:3000 # Substreams endpoint SUBSTREAMS_API_TOKEN=... diff --git a/README.md b/README.md index bf8fff7..d564a7f 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ docker run -it --rm --env-file .env substreams-sink-webhook run # Webhook SECRET_KEY=... PUBLIC_KEY=... -URL=http://127.0.0.1:3000 +WEBHOOK_URL=http://127.0.0.1:3000 # Substreams endpoint SUBSTREAMS_API_TOKEN=... @@ -60,48 +60,33 @@ DISABLE_PING=true ``` $ substreams-sink-webhook --help -Usage: substreams-sink-webhook [options] [command] +Usage: substreams-sink-webhook run [options] Substreams Sink Webhook Options: - -v, --version version for substreams-sink-webhook -e --substreams-endpoint Substreams gRPC endpoint to stream data from --manifest URL of Substreams package --spkg Substreams package (ex: eosio.token) --module_name Name of the output module (declared in the manifest) --chain Substreams supported chain (ex: eth) - -s --start-block Start block to stream from (defaults to -1, which means - the initialBlock of the first module you are streaming) + -s --start-block Start block to stream from (defaults to -1, which means the initialBlock of the first module you are streaming) -t --stop-block Stop block to end stream at, inclusively --substreams-api-token API token for the substream endpoint - --substreams-api-token-envvar Environnement variable name of the API token for the - substream endpoint (default: "SUBSTREAMS_API_TOKEN") - --delay-before-start [OPERATOR] Amount of time in milliseconds (ms) to wait - before starting any internal processes, can be used to - perform to maintenance on the pod before actually letting - it starts (default: "0") - --cursor-file cursor lock file (default: "cursor.lock") - --production-mode Enable Production Mode, with high-speed parallel - processing (default: false) - --verbose Enable verbose logging (default: true) - --metrics-listen-address The process will listen on this address for Prometheus - metrics requests (default: "localhost") - --metrics-listen-port The process will listen on this port for Prometheus - metrics requests (default: "9102") - --metrics-disabled If set, will not send metrics to Prometheus (default: - false) - -p, --params Set a params for parameterizable modules. Can be - specified multiple times. (ex: -p module1=valA -p - module2=valX&valY) (default: []) + --substreams-api-token-envvar Environnement variable name of the API token for the substream endpoint (ex: SUBSTREAMS_API_TOKEN) + --delay-before-start [OPERATOR] Amount of time in milliseconds (ms) to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts + --cursor-file cursor lock file (ex: cursor.lock) + --disable-production-mode Disable production mode (production mode enables high-speed parallel processing) + --verbose Enable verbose logging + --prometheus-hostname Hostname for Prometheus metrics (ex: 127.0.0.1) + --prometheus-port Port for Prometheus metrics (ex: 9102) + --prometheus-disabled If set, will not send metrics to Prometheus + -p, --params Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY) --url Webhook URL to send POST. - --private-key Private key to sign POST data payload (ex: "PVT_K1_...") + --secret-key TweetNaCl Secret-key to sign POST data payload + --concurrency Concurrency of requests (default: "1") + --disable-ping Disable ping on init -h, --help display help for command - -Commands: - completion Generate the autocompletion script for the specified - shell - help Display help for command ``` ## Features diff --git a/bin/cli.ts b/bin/cli.ts index c06681e..53822e8 100755 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -4,6 +4,16 @@ import { action } from "../index.js"; import pkg from "../package.json" assert { type: "json" }; import * as sink from "../externals/substreams-sink.js"; import { keyPair } from "../src/signMessage.js"; +import { ping } from "../src/ping.js"; +import { WEBHOOK_URL } from "../src/config.js"; +import { logger } from "../src/logger.js"; + +export interface WebhookRunOptions extends sink.RunOptions { + url: string; + secretKey: string; + concurrency: string; + disablePing: boolean; +} // Run Webhook Sink const program = sink.program(pkg); @@ -21,4 +31,16 @@ program.command("keypair") console.log(`Public Key: ${publicKey}`); console.log(`Secret Key: ${secretKey}`); }) + +program.command("ping") + .description("Ping Webhook URL") + .option("--url ", "Webhook URL to send POST.") + .option("--secret-key ", 'TweetNaCl Secret-key to sign POST data payload') + .action(async (options) => { + logger.settings.type = "pretty"; + const url = options.url ?? WEBHOOK_URL; + const secretKey = options.secretKey ?? process.env.SECRET_KEY; + const response = await ping(url, secretKey); + console.log(response); + }) program.parse(); diff --git a/examples/http.ts b/examples/http.ts index 2ffc042..6840efa 100644 --- a/examples/http.ts +++ b/examples/http.ts @@ -18,7 +18,6 @@ export default { if (!signature) return new Response("missing required signature in headers", { status: 400 }); if (!body) return new Response("missing body", { status: 400 }); - console.log({signature, timestamp, body}) // validate signature using public key const isVerified = nacl.sign.detached.verify( Buffer.from(timestamp + body), diff --git a/examples/ping-isVerified.http b/examples/ping-isVerified.http new file mode 100644 index 0000000..e6fc799 --- /dev/null +++ b/examples/ping-isVerified.http @@ -0,0 +1,6 @@ +POST http://localhost:3000 HTTP/1.1 +content-type: application/json +x-signature-ed25519: ce31903c09e8f059df392aeccb5c5be2fd6fc317be17149eba60c6c7dc420c328490f316379a28b59bdf2506772ddbed35abf951ce7c84121279de27161e9b06 +x-signature-timestamp: 1686871414 + +{"message":"PING"} \ No newline at end of file diff --git a/examples/ping-not-isVerified.http b/examples/ping-not-isVerified.http new file mode 100644 index 0000000..1e802aa --- /dev/null +++ b/examples/ping-not-isVerified.http @@ -0,0 +1,6 @@ +POST http://localhost:3000 HTTP/1.1 +content-type: application/json +x-signature-ed25519: 32c4f322a21ac05e7c9b7374bb702ccd834e56aeebe8320048440833f2e18358014a5790302fbe3ead8c956cdf2b05c9181b787c55c3e40dc6dbc3ab2cfe730f +x-signature-timestamp: 1686871505 + +{"message":"PING"} \ No newline at end of file diff --git a/externals/substreams-sink.ts b/externals/substreams-sink.ts index 4110add..8700bba 100644 --- a/externals/substreams-sink.ts +++ b/externals/substreams-sink.ts @@ -10,7 +10,7 @@ export interface RunOptions { substreamsApiToken?: string; delayBeforeStart?: string; cursorFile?: string; - productionMode?: boolean; + disableProductionMode?: boolean; verbose?: boolean; prometheusHostname?: string; prometheusPort?: number; @@ -57,13 +57,13 @@ export function option(program: Command, pkg: Package) { .option("-s --start-block ", "Start block to stream from (defaults to -1, which means the initialBlock of the first module you are streaming)") .option("-t --stop-block ", "Stop block to end stream at, inclusively") .option("--substreams-api-token ", "API token for the substream endpoint") - .option("--substreams-api-token-envvar ", "Environnement variable name of the API token for the substream endpoint", DEFAULT_SUBSTREAMS_API_TOKEN_ENV) - .option("--delay-before-start ", "[OPERATOR] Amount of time in milliseconds (ms) to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts", "0") + .option("--substreams-api-token-envvar ", `Environnement variable name of the API token for the substream endpoint (ex: ${DEFAULT_SUBSTREAMS_API_TOKEN_ENV})`) + .option("--delay-before-start ", "[OPERATOR] Amount of time in milliseconds (ms) to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts") .option("--cursor-file ", "cursor lock file (ex: cursor.lock)") - .option("--production-mode", "Enable Production Mode, with high-speed parallel processing", DEFAULT_PRODUCTION_MODE) + .option("--disable-production-mode", "Disable production mode (production mode enables high-speed parallel processing)") .option("--verbose", "Enable verbose logging") - .option(`--prometheus-hostname ", "Hostname for Prometheus metrics (ex: ${DEFAULT_PROMETHEUS_HOSTNAME})`) - .option(`--prometheus-port ", "Port for Prometheus metrics (ex: ${DEFAULT_PROMETHEUS_PORT})`) + .option("--prometheus-hostname ", `Hostname for Prometheus metrics (ex: ${DEFAULT_PROMETHEUS_HOSTNAME})`) + .option("--prometheus-port ", `Port for Prometheus metrics (ex: ${DEFAULT_PROMETHEUS_PORT})`) .option("--prometheus-disabled", "If set, will not send metrics to Prometheus") - .option("-p, --params ", "Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY)", []); + .option("-p, --params ", "Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY)"); } diff --git a/index.ts b/index.ts index 61d7560..80e9269 100644 --- a/index.ts +++ b/index.ts @@ -3,7 +3,6 @@ import fs from "node:fs"; // import { nanoid } from "nanoid"; import { BlockEmitter, createDefaultTransport } from "@substreams/node"; import { createModuleHash, createRegistry, createRequest, fetchSubstream, getModuleOrThrow } from "@substreams/core"; -import { type RunOptions } from "./externals/substreams-sink.js"; import { getSubstreamsEndpoint } from "./src/getSubstreamsEndpoint.js"; import { postWebhook } from "./src/postWebhook.js"; import { signMessage } from "./src/signMessage.js"; @@ -13,16 +12,10 @@ import { queue } from "./src/queue.js"; import { ping } from "./src/ping.js"; import * as metrics from "./externals/prometheus.js"; import { applyParams } from "./externals/applyParams.js"; -import { CONCURRENCY, PROMETHEUS_DISABLED, PROMETHEUS_HOSTNAME, PROMETHEUS_PORT, SECRET_KEY } from "./src/config.js"; +import { CONCURRENCY, CURSOR_FILE, PROMETHEUS_DISABLED, PROMETHEUS_HOSTNAME, PROMETHEUS_PORT, SECRET_KEY, WEBHOOK_URL, DISABLE_PING, SUBSTREAMS_API_TOKEN, SUBSTREAMS_API_TOKEN_ENVVAR } from "./src/config.js"; +import type { WebhookRunOptions } from "./bin/cli.js"; -export interface ActionOptions extends RunOptions { - url: string; - secretKey: string; - concurrency: string; - disablePing: boolean; -} - -export async function action(options: ActionOptions) { +export async function action(options: WebhookRunOptions) { // verbose const verbose = options.verbose ?? JSON.parse(process.env.VERBOSE ?? "false"); if (verbose) { @@ -39,11 +32,11 @@ export async function action(options: ActionOptions) { queue.concurrency = concurrency; // Cursor file - const cursorFile = options.cursorFile ?? process.env.CURSOR_FILE ?? "cursor.lock"; + const cursorFile = options.cursorFile ?? CURSOR_FILE ?? "cursor.lock"; const startCursor = fs.existsSync(cursorFile) ? fs.readFileSync(cursorFile, "utf-8") : ""; // required CLI or environment variables - const url = options.url ?? process.env.URL; + const url = options.url ?? WEBHOOK_URL; if (!url) throw new Error("Missing required --url"); // Private Key to sign messages @@ -51,7 +44,7 @@ export async function action(options: ActionOptions) { if (!secretKey) throw new Error("Missing required --private-key"); // Ping URL to check if it's valid - const disablePing = options.disablePing ?? JSON.parse(process.env.DISABLE_PING ?? "false"); + const disablePing = options.disablePing ?? DISABLE_PING if ( !disablePing ) { if (!await ping(url, secretKey) ) { logger.error("exiting from invalid PING response"); @@ -61,7 +54,8 @@ export async function action(options: ActionOptions) { // auth API token // https://app.streamingfast.io/ - const token = options.substreamsApiToken ?? process.env[options.substreamsApiTokenEnvvar || ""] ?? process.env.SUBSTREAMS_API_TOKEN; + const substreamsApiTokenEnvvar = options.substreamsApiTokenEnvvar ?? SUBSTREAMS_API_TOKEN_ENVVAR; + const token = options.substreamsApiToken ?? SUBSTREAMS_API_TOKEN ?? process.env[substreamsApiTokenEnvvar || ""]; if (!token) throw new Error("SUBSTREAMS_API_TOKEN is require"); let baseUrl = options.substreamsEndpoint ?? process.env.SUBSTREAMS_ENDPOINT; const chain = options.chain ?? process.env.CHAIN; @@ -84,7 +78,7 @@ export async function action(options: ActionOptions) { // Apply params const params = []; - if ( options.params.length ) params.push(...options.params) + if ( options.params?.length ) params.push(...options.params) if ( process.env.PARAM) params.push(process.env.PARAM) if ( params.length ) applyParams(params, substreamPackage.modules.modules); logger.info("params", params); diff --git a/package-lock.json b/package-lock.json index 598e547..0669857 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "substreams-sink-webhook", - "version": "0.1.2", + "version": "0.2.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "substreams-sink-webhook", - "version": "0.1.2", + "version": "0.2.0", "dependencies": { "@substreams/core": "^0.1.10", "@substreams/node": "^0.1.2", @@ -21,6 +21,7 @@ "devDependencies": { "@tsconfig/recommended": "latest", "@types/node": "latest", + "bun-types": "^0.6.9", "tsx": "latest", "typescript": "latest" } @@ -481,6 +482,12 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, + "node_modules/bun-types": { + "version": "0.6.9", + "resolved": "https://registry.npmjs.org/bun-types/-/bun-types-0.6.9.tgz", + "integrity": "sha512-qsxwgWWl6qBrHhj+c8QKte+Qyn8MwQweCp7gR51dfw6G43z+PmNk+ipx3xJj/EIU8i9QPyikriZ/tOeEoYPKyw==", + "dev": true + }, "node_modules/commander": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/commander/-/commander-10.0.1.tgz", diff --git a/package.json b/package.json index 76fcb3b..baf302a 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "type": "module", "scripts": { "build": "tsc", + "test": "bun test", "server": "bun ./examples/http.ts 3000", "start": "tsx ./bin/cli.ts run", "keypair": "tsx ./bin/cli.ts keypair" @@ -24,7 +25,8 @@ "devDependencies": { "@tsconfig/recommended": "latest", "@types/node": "latest", - "typescript": "latest", - "tsx": "latest" + "bun-types": "^0.6.9", + "tsx": "latest", + "typescript": "latest" } } diff --git a/src/config.ts b/src/config.ts index 1c1d688..1fb8a66 100644 --- a/src/config.ts +++ b/src/config.ts @@ -6,7 +6,10 @@ export const DEFAULT_PROMETHEUS_PORT = 9102; export const DEFAULT_PROMETHEUS_HOSTNAME = DEFAULT_HOSTNAME; export const DEFAULT_PROMETHEUS_DISABLED = false; export const DEFAULT_VERBOSE = false; +export const DEFAULT_DISABLE_PING = false; export const DEFAULT_CONCURRENCY = 1; +export const DEFAULT_DISABLE_PRODUCTION_MODE = false; +export const DEFAULT_DELAY_BEFORE_START = 0; // optional export const HOSTNAME = process.env.HOSTNAME ?? DEFAULT_HOSTNAME; @@ -14,5 +17,12 @@ export const PROMETHEUS_PORT = parseInt(process.env.PROMETHEUS_PORT ?? String(DE export const PROMETHEUS_HOSTNAME = process.env.PROMETHEUS_HOSTNAME ?? HOSTNAME; export const PROMETHEUS_DISABLED = JSON.parse(process.env.PROMETHEUS_DISABLED ?? String(DEFAULT_PROMETHEUS_DISABLED)) as boolean; export const VERBOSE = JSON.parse(process.env.VERBOSE ?? String(DEFAULT_VERBOSE)) as boolean; +export const DISABLE_PRODUCTION_MODE = JSON.parse(process.env.DISABLE_PRODUCTION_MODE ?? String(DEFAULT_DISABLE_PRODUCTION_MODE)) as boolean; +export const DISABLE_PING = JSON.parse(process.env.DISABLE_PING ?? String(DEFAULT_DISABLE_PING)) as boolean; +export const CONCURRENCY = parseInt(process.env.CONCURRENCY ?? String(DEFAULT_CONCURRENCY)); +export const DELAY_BEFORE_START = parseInt(process.env.DELAY_BEFORE_START ?? String(DEFAULT_DELAY_BEFORE_START)); export const SECRET_KEY = process.env.SECRET_KEY; -export const CONCURRENCY = parseInt(process.env.CONCURRENCY ?? String(DEFAULT_CONCURRENCY)); \ No newline at end of file +export const WEBHOOK_URL = process.env.WEBHOOK_URL ?? process.env.URL; +export const CURSOR_FILE = process.env.CURSOR_FILE; +export const SUBSTREAMS_API_TOKEN = process.env.SUBSTREAMS_API_TOKEN; +export const SUBSTREAMS_API_TOKEN_ENVVAR = process.env.SUBSTREAMS_API_TOKEN_ENVVAR; diff --git a/src/ping.ts b/src/ping.ts index bb04fbc..f0c3ae4 100644 --- a/src/ping.ts +++ b/src/ping.ts @@ -1,16 +1,17 @@ -import { generateSecretKey, signMessage } from "./signMessage.js"; +import { keyPair, signMessage } from "./signMessage.js"; import { postWebhook } from "./postWebhook.js"; import { logger } from "./logger.js"; export async function ping(url: string, secretKey: string) { const body = JSON.stringify({message: "PING"}); const timestamp = Math.floor(Date.now().valueOf() / 1000); - const signature = signMessage(body, timestamp, secretKey); - const invalidSignature = signMessage(body, timestamp, generateSecretKey()); + const signature = signMessage(timestamp, body, secretKey); + const invalidSecretKey = keyPair().secretKey; + const invalidSignature = signMessage(timestamp, body, invalidSecretKey); // send valid signature (must respond with 200) try { - logger.info("PING", {url, isVerified: true}); + logger.info("PING valid request", {url, timestamp, signature, body, isVerified: true}); await postWebhook(url, body, signature, timestamp, {maximumAttempts: 0}); } catch (e) { logger.error("error PING valid response"); @@ -18,7 +19,7 @@ export async function ping(url: string, secretKey: string) { } // send invalid signature (must NOT respond with 200) try { - logger.info("PING", {url, isVerified: false}); + logger.info("PING invalid request", {url, timestamp, invalidSignature, body, invalidSecretKey, isVerified: false}); await postWebhook(url, body, invalidSignature, timestamp, {maximumAttempts: 0}); logger.error("error PING invalid response"); return false; diff --git a/src/signMessage.spec.ts b/src/signMessage.spec.ts new file mode 100644 index 0000000..30b58ee --- /dev/null +++ b/src/signMessage.spec.ts @@ -0,0 +1,15 @@ +import assert from "node:assert" +import { test } from "bun:test"; +import { verify } from "./signMessage.js"; + +test("signMessage", () => { + const publicKey = "a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"; + const invalidPublicKey = "36657c7498f2ff2e9a520dcfbdad4e7c1e5354a75623165e28f6577a45a9eec3"; + const body = '{"message":"PING"}'; + const sig = "c66a5e1741110b7509d167db723b5a833a0ff4d823ac723037642168ee4843ae1b83c0063e51e5ad69029c97b4b7badf80005f196c0230af9de1bfbf7700a001" + const timestamp = 1686865337 + const msg = Buffer.from(body + timestamp); + + assert.equal(verify(msg, sig, publicKey), true); + assert.equal(verify(msg, sig, invalidPublicKey), false); +}); \ No newline at end of file diff --git a/src/signMessage.ts b/src/signMessage.ts index 60d87db..26ca15d 100644 --- a/src/signMessage.ts +++ b/src/signMessage.ts @@ -1,6 +1,6 @@ import nacl from "tweetnacl"; -export function signMessage(body: string, timestamp: number, secretKey: string) { +export function signMessage(timestamp: number, body: string, secretKey: string) { const msg = Buffer.from(timestamp + body); const signed = nacl.sign.detached(msg, Buffer.from(secretKey, "hex")); return Buffer.from(signed).toString("hex"); @@ -14,10 +14,18 @@ export function keyPair() { }; } -export function generateSecretKey() { - return keyPair().secretKey; +export function fromSecretKey(secretKey: string) { + const from = nacl.sign.keyPair.fromSecretKey(Buffer.from(secretKey, "hex")) + return { + secretKey: Buffer.from(from.secretKey).toString("hex"), + publicKey: Buffer.from(from.publicKey).toString("hex"), + }; } -export function generatePublicKey(secretKey: string) { - return nacl.sign.keyPair.fromSecretKey(Buffer.from(secretKey, "hex")) +export function verify(msg: Buffer, sig: string, publicKey: string) { + return nacl.sign.detached.verify( + msg, + Buffer.from(sig, "hex"), + Buffer.from(publicKey, "hex") + ); } diff --git a/tsconfig.json b/tsconfig.json index 8a1b0b8..785fdc8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,6 +10,7 @@ "declarationMap": true, "sourceMap": true, "verbatimModuleSyntax": true, + "types": ["bun-types"] }, "include": [ "index.ts",