From a3da1489a85e56361713a9927acc99d0854d4156 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Sat, 17 Feb 2024 21:38:31 -0500 Subject: [PATCH] refactor clickhouse sink --- .env.example | 17 +- README.md | 13 +- index.ts | 12 +- package-lock.json | 235 +++++++++++++++++- package.json | 7 +- {src/fetch => sql}/blocks.sql | 0 {src/fetch => sql}/blocks.ts | 4 +- {src/fetch => sql}/cluster.sql | 0 {src/fetch => sql}/cluster.ts | 4 +- sql/cursor.sql | 4 + sql/cursor.ts | 34 +++ {src/clickhouse => sql}/tables/blocks.sql | 9 +- .../tables/module_hashes.sql | 2 +- {src/clickhouse => sql}/tables/tables.ts | 0 src/auth/argon2.bench.ts | 17 -- src/auth/argon2.test.ts | 44 ---- src/auth/argon2.ts | 23 -- src/auth/bearer.ts | 21 -- src/clickhouse/createClient.ts | 38 ++- src/clickhouse/createDatabase.ts | 15 +- src/clickhouse/handleSinkRequest.ts | 225 ++++++++--------- src/clickhouse/ping.ts | 13 +- src/clickhouse/query.ts | 26 ++ src/clickhouse/stores.ts | 101 ++++---- src/clickhouse/table-initialization.ts | 75 +++--- src/clickhouse/tables/blocks_mv.sql | 22 -- .../tables/deleted_entity_changes.sql | 11 - src/clickhouse/tables/final_blocks.sql | 5 - src/clickhouse/tables/unparsed_json.sql | 9 - src/config.ts | 15 -- src/fetch/DELETE.ts | 8 +- src/fetch/GET.ts | 10 +- src/fetch/POST.ts | 10 +- src/fetch/PUT.ts | 21 +- src/fetch/cursor.ts | 60 ----- src/fetch/hash.ts | 8 - src/fetch/health.ts | 16 +- src/fetch/init.ts | 28 +-- src/fetch/openapi.ts | 10 +- src/fetch/pause.ts | 4 +- src/fetch/query.ts | 16 -- src/fetch/schema.ts | 15 +- src/logger.ts | 6 +- src/prometheus.ts | 5 +- src/resume.ts | 19 -- src/schemas.spec.ts | 12 - src/schemas.ts | 10 +- src/sqlite/sqlite.ts | 122 --------- src/sqlite/table.sql | 18 -- 49 files changed, 569 insertions(+), 830 deletions(-) rename {src/fetch => sql}/blocks.sql (100%) rename {src/fetch => sql}/blocks.ts (84%) rename {src/fetch => sql}/cluster.sql (100%) rename {src/fetch => sql}/cluster.ts (72%) create mode 100644 sql/cursor.sql create mode 100644 sql/cursor.ts rename {src/clickhouse => sql}/tables/blocks.sql (58%) rename {src/clickhouse => sql}/tables/module_hashes.sql (100%) rename {src/clickhouse => sql}/tables/tables.ts (100%) delete mode 100644 src/auth/argon2.bench.ts delete mode 100644 src/auth/argon2.test.ts delete mode 100644 src/auth/argon2.ts delete mode 100644 src/auth/bearer.ts create mode 100644 src/clickhouse/query.ts delete mode 100644 src/clickhouse/tables/blocks_mv.sql delete mode 100644 src/clickhouse/tables/deleted_entity_changes.sql delete mode 100644 src/clickhouse/tables/final_blocks.sql delete mode 100644 src/clickhouse/tables/unparsed_json.sql delete mode 100644 src/fetch/cursor.ts delete mode 100644 src/fetch/hash.ts delete mode 100644 src/fetch/query.ts delete mode 100644 src/resume.ts delete mode 100644 src/sqlite/sqlite.ts delete mode 100644 src/sqlite/table.sql diff --git a/.env.example b/.env.example index 60e98f6..7ec7082 100644 --- a/.env.example +++ b/.env.example @@ -7,20 +7,7 @@ PASSWORD= # Webhook Authentication (Optional) PUBLIC_KEY=... # ed25519 public key provided by https://github.com/pinax-network/substreams-sink-webhook -# HTTP Server (Optional) +# Clickhouse Sink (Optional) PORT=3000 HOSTNAME=0.0.0.0 - -# Clickhouse Sink Authentication (Optional) -# PUT endpoints are protected (uses HTTP Basic authentication) -AUTH_KEY=... - -# Clickhouse Sink (Optional) -MAX_BUFFER_SIZE=1000 -INSERTION_DELAY=2000 -WAIT_FOR_INSERT=0 -ASYNC_INSERT=1 -BUFFER=buffer.db -ALLOW_UNPARSED=false -VERBOSE=true -RESUME=true \ No newline at end of file +VERBOSE=true \ No newline at end of file diff --git a/README.md b/README.md index e3b64e3..22d7fd5 100644 --- a/README.md +++ b/README.md @@ -91,24 +91,13 @@ Options: -p, --port HTTP port on which to attach the sink (default: "3000", env: PORT) -v, --verbose Enable verbose logging (choices: "true", "false", default: "true", env: VERBOSE) --hostname Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME) - --public-key Comma separated list of public keys to validate messages (env: PUBLIC_KEY) - --auth-key Auth key to validate requests (env: AUTH_KEY) + --public-keys Comma separated list of public keys to validate messages (env: PUBLIC_KEYS) --host Database HTTP hostname (default: "http://localhost:8123", env: HOST) --username Database user (default: "default", env: USERNAME) --password Password associated with the specified username (default: "", env: PASSWORD) --database The database to use inside ClickHouse (default: "default", env: DATABASE) - --async-insert https://clickhouse.com/docs/en/operations/settings/settings#async-insert (choices: - "0", "1", default: 1, env: ASYNC_INSERT) - --wait-for-async-insert https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert - (choices: "0", "1", default: 0, env: WAIT_FOR_INSERT) - --max-buffer-size Maximum insertion batch size (default: 10000, env: MAX_BUFFER_SIZE) - --insertion-delay Delay between batch insertions (in ms) (default: 2000, env: INSERTION_DELAY) --allow-unparsed Enable storage in 'unparsed_json' table (choices: "true", "false", default: false, env: ALLOW_UNPARSED) - --resume Save the cached data from the previous process into ClickHouse (choices: "true", - "false", default: true, env: RESUME) - --buffer SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. - (default: "buffer.db", env: BUFFER) -h, --help display help for command ``` diff --git a/index.ts b/index.ts index e81274b..97ec8d2 100644 --- a/index.ts +++ b/index.ts @@ -10,6 +10,7 @@ import PUT from "./src/fetch/PUT.js"; import { NotFound } from "./src/fetch/cors.js"; import { logger } from "./src/logger.js"; import init from "./src/fetch/init.js"; +import { show_tables } from "./src/clickhouse/stores.js"; if (config.verbose) logger.enable(); @@ -26,8 +27,9 @@ const app = Bun.serve({ }, }); -logger.info('[app]', `${name} v${version}`); -logger.info('[app]', `Server listening on http://${app.hostname}:${app.port}`); -logger.info('[app]', `Clickhouse Server ${config.host} (${config.database})`); -if (config.publicKey) logger.info('[app]', `Webhook Ed25519 Public Key: ${config.publicKey}`); -init(); \ No newline at end of file +logger.info('[app]\t', `${name} v${version}`); +logger.info('[app]\t', `Server listening on http://${app.hostname}:${app.port}`); +logger.info('[app]\t', `Clickhouse Server ${config.host} (${config.database})`); +if (config.publicKey) logger.info('[app]\t', `Webhook Ed25519 Public Key: ${config.publicKey}`); +await init(); +await show_tables(); \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 08eb39c..8b15b6c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,20 +1,21 @@ { "name": "substreams-sink-clickhouse", - "version": "0.2.1", + "version": "0.2.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "substreams-sink-clickhouse", - "version": "0.2.1", + "version": "0.2.2", "license": "MIT", "dependencies": { - "@clickhouse/client-web": "latest", - "@substreams/sink-database-changes": "^0.3.0", - "@substreams/sink-entity-changes": "v0.3.4", + "@clickhouse/client-web": "^0.2.9", + "@substreams/sink-database-changes": "^0.3.5", + "@substreams/sink-entity-changes": "^0.3.5", "commander": "latest", "dotenv": "latest", "graphql": "latest", + "log-update": "^6.0.0", "openapi3-ts": "latest", "p-queue": "latest", "prom-client": "latest", @@ -292,9 +293,9 @@ } }, "node_modules/@substreams/sink-database-changes": { - "version": "0.3.2", - "resolved": "https://registry.npmjs.org/@substreams/sink-database-changes/-/sink-database-changes-0.3.2.tgz", - "integrity": "sha512-9NOp1C54QOsunEbAf2fe3xGr0MtOf1P2b/M36Nf8t4no/NKUIDeCYLApzZR8/9Ra/mci6XXT/4P+Z1Pj5hU3Dg==", + "version": "0.3.5", + "resolved": "https://registry.npmjs.org/@substreams/sink-database-changes/-/sink-database-changes-0.3.5.tgz", + "integrity": "sha512-VAKonNqT7Y4vJVrtrOcejakPgla/pEhcBfRPVR/ztPErQIWamV8y4zXo6Fxc+AxhQokjy/+4ADugp3zPZzzGFw==", "dependencies": { "@bufbuild/protobuf": "latest", "@sinclair/typebox": "latest", @@ -302,9 +303,9 @@ } }, "node_modules/@substreams/sink-entity-changes": { - "version": "0.3.4", - "resolved": "https://registry.npmjs.org/@substreams/sink-entity-changes/-/sink-entity-changes-0.3.4.tgz", - "integrity": "sha512-sSIv6bk2/WNEl60a1HatjYypfZ6NpQ7j8sKo0rcQ7eYzyDC2/sPmcAoBqDoLaVFmGt9pCj+EGFK+ouxogmEJnA==", + "version": "0.3.5", + "resolved": "https://registry.npmjs.org/@substreams/sink-entity-changes/-/sink-entity-changes-0.3.5.tgz", + "integrity": "sha512-M/x5UBebXU1wX6l8jZ6GU6Kb3xlhKYmo9IzoH9dvkiH/fg2c7sHjCRu0CzahlibHnoP8kG1SkDTr6wSBBnAoQw==", "dependencies": { "@bufbuild/protobuf": "latest", "@sinclair/typebox": "latest", @@ -338,6 +339,42 @@ "@types/node": "*" } }, + "node_modules/ansi-escapes": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-6.2.0.tgz", + "integrity": "sha512-kzRaCqXnpzWs+3z5ABPQiVke+iq0KXkHo8xiWV4RPTi5Yli0l97BEQuhXV1s7+aSU/fu1kUuxgS4MsQ0fRuygw==", + "dependencies": { + "type-fest": "^3.0.0" + }, + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/ansi-regex": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-6.0.1.tgz", + "integrity": "sha512-n5M855fKb2SsfMIiFFoVrABHJC8QtHwVx+mHWP3QcEqBHYienj5dHSgjbxtC0WEZXYt4wcD6zrQElDPhFuZgfA==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/chalk/ansi-regex?sponsor=1" + } + }, + "node_modules/ansi-styles": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-6.2.1.tgz", + "integrity": "sha512-bN798gFfQX+viw3R7yrGWRqnrN2oRkEkUjjl4JNn4E8GxxbjtG3FbrEIIY3l8/hrwUwIeCZvi4QuOTP4MErVug==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/chalk/ansi-styles?sponsor=1" + } + }, "node_modules/bintrees": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", @@ -353,6 +390,20 @@ "@types/ws": "~8.5.10" } }, + "node_modules/cli-cursor": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/cli-cursor/-/cli-cursor-4.0.0.tgz", + "integrity": "sha512-VGtlMu3x/4DOtIUwEkRezxUZ2lBacNJCHash0N0WeZDBS+7Ux1dm3XWAgWYxLJFMMdOeXMHXorshEFhbMSGelg==", + "dependencies": { + "restore-cursor": "^4.0.0" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/commander": { "version": "12.0.0", "resolved": "https://registry.npmjs.org/commander/-/commander-12.0.0.tgz", @@ -378,6 +429,11 @@ "integrity": "sha512-qarp/WBRty4VM//ZEKDY8Bu3Tolr1afEIwFVZtU3eMOtBZpRIQkLVQMTOjUTt3qeJAAIHfYfEk7GiKg6ABCWlg==", "peer": true }, + "node_modules/emoji-regex": { + "version": "10.3.0", + "resolved": "https://registry.npmjs.org/emoji-regex/-/emoji-regex-10.3.0.tgz", + "integrity": "sha512-QpLs9D9v9kArv4lfDEgg1X/gN5XLnf/A6l9cs8SPZLRZR3ZkY9+kwIQTxm+fsSej5UMYGE8fdoaZVIBlqG0XTw==" + }, "node_modules/eventemitter3": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", @@ -405,6 +461,17 @@ "node": ">=8.0.0" } }, + "node_modules/get-east-asian-width": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/get-east-asian-width/-/get-east-asian-width-1.2.0.tgz", + "integrity": "sha512-2nk+7SIVb14QrgXFHcm84tD4bKQz0RxPuMT8Ag5KPOq7J5fEmAg0UbXdTOSHqNuHSU28k55qnceesxXRZGzKWA==", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/graphql": { "version": "16.8.1", "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.8.1.tgz", @@ -413,12 +480,66 @@ "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" } }, + "node_modules/is-fullwidth-code-point": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/is-fullwidth-code-point/-/is-fullwidth-code-point-5.0.0.tgz", + "integrity": "sha512-OVa3u9kkBbw7b8Xw5F9P+D/T9X+Z4+JruYVNapTjPYZYUznQ5YfWeFkOj606XYYW8yugTfC8Pj0hYqvi4ryAhA==", + "dependencies": { + "get-east-asian-width": "^1.0.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/log-update": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/log-update/-/log-update-6.0.0.tgz", + "integrity": "sha512-niTvB4gqvtof056rRIrTZvjNYE4rCUzO6X/X+kYjd7WFxXeJ0NwEFnRxX6ehkvv3jTwrXnNdtAak5XYZuIyPFw==", + "dependencies": { + "ansi-escapes": "^6.2.0", + "cli-cursor": "^4.0.0", + "slice-ansi": "^7.0.0", + "strip-ansi": "^7.1.0", + "wrap-ansi": "^9.0.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/mimic-fn": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-2.1.0.tgz", + "integrity": "sha512-OqbOk5oEQeAZ8WXWydlu9HJjz9WVdEIvamMCcXmuqUYjTknH/sqsWvhQ3vgwKFRR1HpjvNBKQ37nbJgYzGqGcg==", + "engines": { + "node": ">=6" + } + }, "node_modules/mitata": { "version": "0.1.10", "resolved": "https://registry.npmjs.org/mitata/-/mitata-0.1.10.tgz", "integrity": "sha512-XeD7Bv4aiNyerNCmD1UIeUOjw6dHnCdZNuCeM9HQd8g+QRUcgX1lPV570LVRsYAAz66lto2XTAu6P8a6FQQ2fg==", "dev": true }, + "node_modules/onetime": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/onetime/-/onetime-5.1.2.tgz", + "integrity": "sha512-kbpaSSGJTWdAY5KPVeMOKXSrPtr8C8C7wodJbcsd51jRnmD+GZu8Y0VoU6Dm5Z4vWr0Ig/1NKuWRKf7j5aaYSg==", + "dependencies": { + "mimic-fn": "^2.1.0" + }, + "engines": { + "node": ">=6" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/openapi3-ts": { "version": "4.2.1", "resolved": "https://registry.npmjs.org/openapi3-ts/-/openapi3-ts-4.2.1.tgz", @@ -481,6 +602,71 @@ ], "peer": true }, + "node_modules/restore-cursor": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/restore-cursor/-/restore-cursor-4.0.0.tgz", + "integrity": "sha512-I9fPXU9geO9bHOt9pHHOhOkYerIMsmVaWB0rA2AI9ERh/+x/i7MV5HKBNrg+ljO5eoPVgCcnFuRjJ9uH6I/3eg==", + "dependencies": { + "onetime": "^5.1.0", + "signal-exit": "^3.0.2" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/signal-exit": { + "version": "3.0.7", + "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz", + "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==" + }, + "node_modules/slice-ansi": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/slice-ansi/-/slice-ansi-7.1.0.tgz", + "integrity": "sha512-bSiSngZ/jWeX93BqeIAbImyTbEihizcwNjFoRUIY/T1wWQsfsm2Vw1agPKylXvQTU7iASGdHhyqRlqQzfz+Htg==", + "dependencies": { + "ansi-styles": "^6.2.1", + "is-fullwidth-code-point": "^5.0.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/chalk/slice-ansi?sponsor=1" + } + }, + "node_modules/string-width": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/string-width/-/string-width-7.1.0.tgz", + "integrity": "sha512-SEIJCWiX7Kg4c129n48aDRwLbFb2LJmXXFrWBG4NGaRtMQ3myKPKbwrD1BKqQn74oCoNMBVrfDEr5M9YxCsrkw==", + "dependencies": { + "emoji-regex": "^10.3.0", + "get-east-asian-width": "^1.0.0", + "strip-ansi": "^7.1.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/strip-ansi": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-7.1.0.tgz", + "integrity": "sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ==", + "dependencies": { + "ansi-regex": "^6.0.1" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/chalk/strip-ansi?sponsor=1" + } + }, "node_modules/substreams-sink": { "version": "0.14.1", "resolved": "https://registry.npmjs.org/substreams-sink/-/substreams-sink-0.14.1.tgz", @@ -531,6 +717,17 @@ "url": "https://github.com/fullstack-build/tslog?sponsor=1" } }, + "node_modules/type-fest": { + "version": "3.13.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-3.13.1.tgz", + "integrity": "sha512-tLq3bSNx+xSpwvAJnzrK0Ep5CLNWjvFTOp71URMaAEWBfRb9nnJiBoUe0tF8bI4ZFO3omgBR6NvnbzVUT3Ly4g==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/typescript": { "version": "5.3.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.3.3.tgz", @@ -561,6 +758,22 @@ "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", "dev": true }, + "node_modules/wrap-ansi": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-9.0.0.tgz", + "integrity": "sha512-G8ura3S+3Z2G+mkgNRq8dqaFZAuxfsxpBB8OCTGRTCtp+l/v9nbFNmCUP1BZMts3G1142MsZfn6eeUKrr4PD1Q==", + "dependencies": { + "ansi-styles": "^6.2.1", + "string-width": "^7.0.0", + "strip-ansi": "^7.1.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/chalk/wrap-ansi?sponsor=1" + } + }, "node_modules/yaml": { "version": "2.3.4", "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.4.tgz", diff --git a/package.json b/package.json index 5e37ee3..8304dc3 100644 --- a/package.json +++ b/package.json @@ -26,12 +26,13 @@ "build": "bun build --compile ./index.ts --outfile substreams-sink-clickhouse" }, "dependencies": { - "@clickhouse/client-web": "latest", - "@substreams/sink-database-changes": "^0.3.0", - "@substreams/sink-entity-changes": "v0.3.4", + "@clickhouse/client-web": "^0.2.9", + "@substreams/sink-database-changes": "^0.3.5", + "@substreams/sink-entity-changes": "^0.3.5", "commander": "latest", "dotenv": "latest", "graphql": "latest", + "log-update": "^6.0.0", "openapi3-ts": "latest", "p-queue": "latest", "prom-client": "latest", diff --git a/src/fetch/blocks.sql b/sql/blocks.sql similarity index 100% rename from src/fetch/blocks.sql rename to sql/blocks.sql diff --git a/src/fetch/blocks.ts b/sql/blocks.ts similarity index 84% rename from src/fetch/blocks.ts rename to sql/blocks.ts index 889d091..ad6f64d 100644 --- a/src/fetch/blocks.ts +++ b/sql/blocks.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import client from "../clickhouse/createClient.js"; +import { readOnlyClient } from "../src/clickhouse/createClient.js"; export const BlockResponseSchema = z.object({ chain: z.string(), @@ -25,7 +25,7 @@ export function getChain(req: Request, required = true) { export async function blocks(req: Request) { const query = await Bun.file(import.meta.dirname + "/blocks.sql").text() const chain = getChain(req, false); - const response = await client.query({ query, format: "JSONEachRow" }); + const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); let data = await response.json() as BlockResponseSchema[]; if ( chain ) data = data.filter((row) => row.chain === chain); return data; diff --git a/src/fetch/cluster.sql b/sql/cluster.sql similarity index 100% rename from src/fetch/cluster.sql rename to sql/cluster.sql diff --git a/src/fetch/cluster.ts b/sql/cluster.ts similarity index 72% rename from src/fetch/cluster.ts rename to sql/cluster.ts index f36c49c..156dbe5 100644 --- a/src/fetch/cluster.ts +++ b/sql/cluster.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import client from "../clickhouse/createClient.js"; +import { readOnlyClient } from "../src/clickhouse/createClient.js"; export const ClusterSchema = z.object({ count: z.number(), @@ -13,6 +13,6 @@ export type ClusterSchema = z.infer; export async function cluster() { const query = await Bun.file(import.meta.dirname + "/cluster.sql").text() - const response = await client.query({ query, format: "JSONEachRow" }); + const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); return response.json(); } diff --git a/sql/cursor.sql b/sql/cursor.sql new file mode 100644 index 0000000..b8ef3f7 --- /dev/null +++ b/sql/cursor.sql @@ -0,0 +1,4 @@ +SELECT latest_cursor +FROM module_hashes +WHERE chain = {chain: String} AND module_hash = {module_hash: String} +LIMIT 1 \ No newline at end of file diff --git a/sql/cursor.ts b/sql/cursor.ts new file mode 100644 index 0000000..faab252 --- /dev/null +++ b/sql/cursor.ts @@ -0,0 +1,34 @@ +import { readOnlyClient } from "../src/clickhouse/createClient.js"; +import * as store from "../src/clickhouse/stores.js"; + +export async function findLatestCursor(req: Request) { + const { module_hash, chain} = await paramsLatestCursor(req); + const query = await Bun.file(import.meta.dirname + "/cursor.sql").text() + const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); + const data = await response.json>(); + + if (data.length === 1) return data[0].latest_cursor; + + throw new Error(`Bad request: no cursor found for '${module_hash}' on '${chain}'.`); +} + +async function paramsLatestCursor(req: Request) { + const url = new URL(req.url); + const chain = url.searchParams.get("chain"); + const module_hash = url.searchParams.get("module_hash"); + + if (!chain) throw new Error("Missing parameter: chain"); + if (!module_hash) throw new Error("Missing parameter: module_hash"); + + if (!(await store.query_chains()).includes(chain)) { + store.reset(); + throw new Error("Invalid parameter: chain=" + chain); + } + + if (!(await store.query_module_hashes()).includes(module_hash)) { + store.reset(); + throw new Error("Invalid parameter: module_hash=" + module_hash); + } + + return { chain, module_hash }; +} diff --git a/src/clickhouse/tables/blocks.sql b/sql/tables/blocks.sql similarity index 58% rename from src/clickhouse/tables/blocks.sql rename to sql/tables/blocks.sql index 2bdb7d5..be44a1e 100644 --- a/src/clickhouse/tables/blocks.sql +++ b/sql/tables/blocks.sql @@ -1,10 +1,9 @@ --- blocks -- CREATE TABLE IF NOT EXISTS blocks ( - block_id FixedString(64), - block_number UInt32(), chain LowCardinality(String), + module_hash FixedString(40), + block_number UInt32(), timestamp DateTime64(3, 'UTC'), + block_id FixedString(64) ) ENGINE = ReplacingMergeTree -PRIMARY KEY (block_id) -ORDER BY (block_id, chain, block_number, timestamp); +ORDER BY (chain, module_hash, block_number, timestamp); diff --git a/src/clickhouse/tables/module_hashes.sql b/sql/tables/module_hashes.sql similarity index 100% rename from src/clickhouse/tables/module_hashes.sql rename to sql/tables/module_hashes.sql index 9e7e54a..3580a52 100644 --- a/src/clickhouse/tables/module_hashes.sql +++ b/sql/tables/module_hashes.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS module_hashes ( + chain LowCardinality(String), module_hash FixedString(40), module_name String, - chain LowCardinality(String), type String, latest_cursor String, latest_block_number UInt32, diff --git a/src/clickhouse/tables/tables.ts b/sql/tables/tables.ts similarity index 100% rename from src/clickhouse/tables/tables.ts rename to sql/tables/tables.ts diff --git a/src/auth/argon2.bench.ts b/src/auth/argon2.bench.ts deleted file mode 100644 index 1254e23..0000000 --- a/src/auth/argon2.bench.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { run, bench, group, baseline } from 'mitata'; - -const passwordHash = "$argon2id$v=19$m=65536,t=2,p=1$53yGw9x/71TwPK/jEX056kYMTLq+DIFAkCg2wIo+N7A$VGxk8EPwP8sLib1NDoo9YNh1eKLNCr2sy3uZywh5ayk"; - -group('argon2', () => { - baseline('baseline', () => {}); - bench('Bun.password.verifySync', () => Bun.password.verifySync("password", passwordHash)); -}); - -await run({ - avg: true, // enable/disable avg column (default: true) - json: false, // enable/disable json output (default: false) - colors: true, // enable/disable colors (default: true) - min_max: true, // enable/disable min/max column (default: true) - collect: false, // enable/disable collecting returned values into an array during the benchmark (default: false) - percentiles: false, // enable/disable percentiles column (default: true) -}); \ No newline at end of file diff --git a/src/auth/argon2.test.ts b/src/auth/argon2.test.ts deleted file mode 100644 index 866e8d3..0000000 --- a/src/auth/argon2.test.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { config } from "../config.js"; -import * as argon2 from "./argon2.js"; - -const passwordHash = - "$argon2id$v=19$m=65536,t=2,p=1$53yGw9x/71TwPK/jEX056kYMTLq+DIFAkCg2wIo+N7A$VGxk8EPwP8sLib1NDoo9YNh1eKLNCr2sy3uZywh5ayk"; - -describe("argon2", () => { - let authKey: string | undefined = ""; - - beforeEach(() => (authKey = config.authKey)); - afterEach(() => (config["authKey"] = authKey)); - - test("it should skip auth check when no auth-key is passed in", () => { - config["authKey"] = ""; - const request = new Request("http://localhost", { - headers: { Authorization: "Bearer auth-key" }, - }); - - const response = argon2.beforeHandle(request); - expect(response.success).toBeTrue(); - }); - - test("it should return 'unauthorized' on invalid password", () => { - config["authKey"] = passwordHash; - const request = new Request("http://localhost", { headers: { Authorization: "Bearer pwd" } }); - - const response = argon2.beforeHandle(request); - expect(response.success).toBeFalse(); - if (!response.success) { - expect(response.error.status).toBe(401); - } - }); - - test("it should allow valid passwords", () => { - config["authKey"] = passwordHash; - const request = new Request("http://localhost", { - headers: { Authorization: "Bearer password" }, - }); - - const response = argon2.beforeHandle(request); - expect(response.success).toBeTrue(); - }); -}); diff --git a/src/auth/argon2.ts b/src/auth/argon2.ts deleted file mode 100644 index 65a4dda..0000000 --- a/src/auth/argon2.ts +++ /dev/null @@ -1,23 +0,0 @@ -// https://bun.sh/guides/util/hash-a-password -import { config } from "../config.js"; -import { logger } from "../logger.js"; -import { Err, Ok, Result } from "../result.js"; -import { InvalidAuthRequest, NoAuthorization, Unauthorized, getBearer } from "./bearer.js"; - -export function beforeHandle(request: Request): Result { - if (!config.authKey) return Ok(); - - const password = getBearer(request); - if (!password) return Err(NoAuthorization); - - try { - if (!Bun.password.verifySync(password, config.authKey)) { - return Err(Unauthorized); - } - } catch (e) { - logger.error(e); - return Err(InvalidAuthRequest); - } - - return Ok(); -} diff --git a/src/auth/bearer.ts b/src/auth/bearer.ts deleted file mode 100644 index 68594d4..0000000 --- a/src/auth/bearer.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { toText } from "../fetch/cors.js"; - -// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/WWW-Authenticate#directives_ -const TOKEN_STRINGS = "[A-Za-z0-9._~+/-]+=*"; -const PREFIX = "Bearer"; -const REALM = "sign"; - -export const NoAuthorization = toText("Unauthorized", 400, new Headers({ "WWW-Authenticate": `${PREFIX} realm="${REALM}"` })); -export const Unauthorized = toText("Unauthorized", 401, new Headers({ "WWW-Authenticate": `${PREFIX} error="invalid_token"` })); -export const InvalidAuthRequest = toText("Bad Request", 400, new Headers({ "WWW-Authenticate": `${PREFIX} error="invalid_request"` })); - -export function getBearerToken(headerToken: string) { - const regexp = new RegExp(`^${PREFIX} (${TOKEN_STRINGS}) *$`); - const match = regexp.exec(headerToken); - return match ? match[1] : null; -} - -export function getBearer(request: Request) { - const headerToken = request.headers.get("Authorization"); - return headerToken ? getBearerToken(headerToken) : null; -} diff --git a/src/clickhouse/createClient.ts b/src/clickhouse/createClient.ts index 09dbe25..45fae55 100644 --- a/src/clickhouse/createClient.ts +++ b/src/clickhouse/createClient.ts @@ -1,29 +1,19 @@ -import { createClient as createClientWeb } from "@clickhouse/client-web"; +import * as clickhouse from "@clickhouse/client-web"; import { APP_NAME, config } from "../config.js"; -import { ping } from "./ping.js"; function createClient(readonly = false) { - const client = createClientWeb({ - ...config, - clickhouse_settings: { - wait_for_async_insert: config.waitForAsyncInsert, // 0 - async_insert: config.asyncInsert, // 1 - readonly: readonly ? "1" : "0", - }, - application: APP_NAME, - }); - - // These overrides should not be required but the @clickhouse/client-web instance - // does not work well with Bun's implementation of Node streams. - // https://github.com/oven-sh/bun/issues/5470 - client.command = client.exec; - client.ping = ping; - - return client; + return clickhouse.createClient({ + host: config.host, + password: config.password, + database: config.database, + clickhouse_settings: { + // wait_for_async_insert: 0, // 0 + async_insert: 0, // 1 + readonly: readonly ? "1" : "0", + }, + application: APP_NAME, + }) } -const client = createClient(); -const readOnlyClient = createClient(true); - -export default client; -export { readOnlyClient }; +export const client = createClient(false); +export const readOnlyClient = createClient(true); \ No newline at end of file diff --git a/src/clickhouse/createDatabase.ts b/src/clickhouse/createDatabase.ts index a786d6f..0d8bb37 100644 --- a/src/clickhouse/createDatabase.ts +++ b/src/clickhouse/createDatabase.ts @@ -1,14 +1,11 @@ import { logger } from "../logger.js"; -import { Err, Ok, Result } from "../result.js"; -import client from "./createClient.js"; +import { client } from "./createClient.js"; -export async function createDatabase(database: string): Promise { +export async function createDatabase(database: string) { if (!database) { - return Err(new Error("[database] is required")); + throw new Error("[database] is required") } - - await client.exec({ query: `CREATE DATABASE IF NOT EXISTS "${database}"` }); - logger.info('[createDatabase]', `CREATE DATABASE [${database}]`); - - return Ok(); + logger.info('[createDatabase]\t', `CREATE DATABASE [${database}]`); + const query = `CREATE DATABASE IF NOT EXISTS "${database}"`; + return {query, ...await client.exec({ query })}; } diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 3f57bf9..1d31c93 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -2,169 +2,139 @@ import { getValuesInTableChange } from "@substreams/sink-database-changes"; import { TableChange } from "@substreams/sink-database-changes/zod"; import { getValuesInEntityChange } from "@substreams/sink-entity-changes"; import { EntityChange } from "@substreams/sink-entity-changes/zod"; -import PQueue from "p-queue"; import { Clock, Manifest } from "substreams-sink-webhook/schemas"; -import { config } from "../config.js"; -import { logger } from "../logger.js"; import * as prometheus from "../prometheus.js"; import { PayloadBody } from "../schemas.js"; -import { sqlite } from "../sqlite/sqlite.js"; -import client from "./createClient.js"; -import { store } from "./stores.js"; +import { client } from "./createClient.js"; +import * as store from "./stores.js"; +import logUpdate from 'log-update'; type Metadata = { clock: Clock; manifest: Manifest; cursor: string }; -let bufferedItems = 0; -let timeLimitReached = true; -const clickhouseQueue = new PQueue({ concurrency: 2 }); +const buffer = new Map[]>() -export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { - prometheus.sink_requests.inc({ - chain: metadata.manifest.chain, - module_hash: metadata.manifest.moduleHash, - }); - bufferedItems++; +function now() { + return Math.floor(new Date().getTime() / 1000); +} + +let success = 0; +let start = now(); +let lastUpdate = now(); + +function bufferCount() { + let count = 0 + for ( const value of buffer.values() ) { + count += value.length; + }; + return count; +} + +// TO-DO - use Prometheus metrics as input to this function +function logProgress() { + const delta = now() - start + const rate = Math.round(success / delta); + const count = bufferCount(); + success++; + logUpdate(`[handleSinkRequest]: ${success} total [${rate} b/s] buffer size: ${count}`); +} + +async function flushBuffer() { + // clear buffer every 1 second + if ( lastUpdate != now() ) { + for ( const [table, values] of buffer.entries() ) { + await client.insert({table, values, format: "JSONEachRow"}) + buffer.delete(table); + } + lastUpdate = now(); + } +} +// ~200-500 blocks per second +export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { // Different handler if `graph_out` or `db_out` is emitting data. // Handles no incoming data as well. if ("entityChanges" in data && data.entityChanges.length > 0) { handleEntityChanges(data.entityChanges, metadata); } else if ("tableChanges" in data && data.tableChanges.length > 0) { handleDatabaseChanges(data.tableChanges, metadata); - } else { - handleNoChange(metadata); - } - - if (batchSizeLimitReached()) { - // Wait for the next insertion window - await clickhouseQueue.onIdle(); } - if (timeLimitReached) { - // If the previous batch is not fully inserted, wait for it to be. - await clickhouseQueue.onIdle(); - bufferedItems = 0; + // insert metadata + insertModuleHashes(metadata); + insertBlocks(metadata); - // Plan the next insertion in `config.insertionDelay` ms - timeLimitReached = false; - clickhouseQueue - .add(() => new Promise((resolve) => setTimeout(resolve, config.insertionDelay))) - .then(() => (timeLimitReached = true)); + await flushBuffer(); - // Start an async job to insert every record stored in the current batch. - // This job will be awaited before starting the next batch. - clickhouseQueue.add(saveKnownEntityChanges); - } + // logging + prometheus.sink_requests.inc({ + chain: metadata.manifest.chain, + module_hash: metadata.manifest.moduleHash, + }); + logProgress(); return new Response("OK"); } -export function saveKnownEntityChanges() { - return sqlite.commitBuffer(async (blocks, finalBlocks, moduleHashes, entityChanges) => { - if (moduleHashes.length > 0) { - await client.insert({ values: moduleHashes, table: "module_hashes", format: "JSONEachRow" }); - } - - if (finalBlocks.length > 0) { - await client.insert({ values: finalBlocks, table: "final_blocks", format: "JSONEachRow" }); - } - - if (blocks.length > 0) { - await client.insert({ values: blocks, table: "blocks", format: "JSONEachRow" }); - } +function insertModuleHashes(metadata: Metadata) { + const values = { + chain: metadata.manifest.chain, + module_hash: metadata.manifest.moduleHash, + module_name: metadata.manifest.moduleName, + type: metadata.manifest.type, + latest_cursor: metadata.cursor, + latest_block_number: metadata.clock.number, + latest_block_id: metadata.clock.id, + latest_timestamp: Number(new Date(metadata.clock.timestamp)), + }; + insertToBuffer("module_hashes", values); +} - for (const [table, values] of Object.entries(entityChanges)) { - if (values.length > 0) { - // This check ensures that old stale data coming from SQLite - // is not inserted after the ClickHouse schema was modified. - if (await store.existsTable(table)) { - await client.insert({ table, values, format: "JSONEachRow" }); - } else { - logger.info('[saveKnownEntityChanges]', `Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`); - } - } - } - }); +function insertBlocks(metadata: Metadata) { + const values = { + chain: metadata.manifest.chain, + module_hash: metadata.manifest.moduleHash, + block_number: metadata.clock.number, + timestamp: Number(new Date(metadata.clock.timestamp)), + block_id: metadata.clock.id, + }; + insertToBuffer("blocks", values); } -function batchSizeLimitReached() { - return bufferedItems >= config.maxBufferSize; +function insertToBuffer(table: string, values: Record) { + // throw error if tables are not loaded + if (!store.tables) throw new Error("no tables are loaded"); + if (!store.tables.has(table)) { + throw new Error(`table ${table} does not exist (call HTTP PUT "/sql/schema" to create table schemas)`); + } + + // append values to buffer (used for in-memory Clickhouse DB batching) + if ( !buffer.has(table) ) { + buffer.set(table, [values]); + } else { + buffer.get(table)?.push(values); + } } function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) { - logger.info('[handleEntityChanges]', `entityChanges=${entityChanges.length}`); for (const change of entityChanges) { const values = getValuesInEntityChange(change); - handleChange(change.entity, values, change.operation, { ...metadata, id: change.id }); + const id = change.id; // primary key + insertEntityChange(change.entity, values, change.operation, { ...metadata, id }); } } function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) { - logger.info('[handleDatabaseChanges]', `tableChanges=${tableChanges.length}`); for (const change of tableChanges) { const values = getValuesInTableChange(change); - handleChange(change.table, values, change.operation, { ...metadata, id: "" }); - } -} - -function handleNoChange(metadata: Metadata) { - const { clock, manifest, cursor } = metadata; - sqlite.insert("", "", clock, manifest, cursor); -} - -async function handleChange( - table: string, - values: Record, - operation: EntityChange["operation"] | TableChange["operation"], - metadata: Metadata & { id: string } -) { - const tableExists = await store.existsTable(table); - const data = JSON.stringify(values); - const clock = JSON.stringify(metadata.clock); - const manifest = JSON.stringify(metadata.manifest); - const environment = { chain: metadata.manifest.chain, module_hash: metadata.manifest.moduleHash }; - - if (!tableExists) { - if (!config.allowUnparsed) { - throw new Error(`could not find table '${table}'. Did you mean to store unparsed data?`); - } - - values = { raw_data: data, source: table }; - table = "unparsed_json"; - } - - logger.info('[handleChange]', [table, operation, metadata.id, clock, manifest, data].join(" | ")); - - switch (operation) { - case "OPERATION_CREATE": - prometheus.entity_changes_inserted.inc(environment); - return insertEntityChange(table, values, metadata); - - // Updates are inserted as new rows in ClickHouse. This allows for the full history. - // If the user wants to override old data, they can specify it in their schema - // by using a ReplacingMergeTree. - case "OPERATION_UPDATE": - prometheus.entity_changes_updated.inc(environment); - return insertEntityChange(table, values, metadata); - - // Deleted entity changes are not actually removed from the database. - // They are stored in the 'deleted_entity_changes' table with their timestamp. - // Again, this allows to keep the full history while also providing the required information - // to correctly filter out unwanted data if necessary. - case "OPERATION_DELETE": - prometheus.entity_changes_deleted.inc(environment); - return insertEntityChange("deleted_entity_changes", { source: table }, metadata); - - default: - prometheus.entity_changes_unsupported.inc(); - logger.error('[handleChange]', "unsupported operation found in entityChanges: " + operation.toString()); - return Promise.resolve(); + const id = ""; // database changes do not have a primary key + insertEntityChange(change.table, values, change.operation, { ...metadata, id }); } } function insertEntityChange( table: string, values: Record, + operation: EntityChange["operation"] | TableChange["operation"], metadata: { id: string; clock: Clock; manifest: Manifest; cursor: string } ) { values["id"] = metadata.id; // Entity ID @@ -173,6 +143,17 @@ function insertEntityChange( values["block_number"] = metadata.clock.number; // Block Number values["module_hash"] = metadata.manifest.moduleHash; values["timestamp"] = Number(new Date(metadata.clock.timestamp)); // Block timestamp + values["operation"] = operation; + insertToBuffer(table, values); - sqlite.insert(JSON.stringify(values), table, metadata.clock, metadata.manifest, metadata.cursor); + // log + prometheus.entity_changes.inc({ + chain: metadata.manifest.chain, + module_hash: metadata.manifest.moduleHash, + operation, + }); } + +function timeout(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} \ No newline at end of file diff --git a/src/clickhouse/ping.ts b/src/clickhouse/ping.ts index bc2f3eb..69709f6 100644 --- a/src/clickhouse/ping.ts +++ b/src/clickhouse/ping.ts @@ -1,11 +1,6 @@ -import { Ok, Result, UnknownErr } from "../result.js"; -import client from "./createClient.js"; +import { readOnlyClient } from "./createClient.js"; -export async function ping(): Promise { - try { - await client.exec({ query: "SELECT 1" }); - return Ok(); - } catch (err) { - return UnknownErr(err); - } +export async function ping() { + const query = "SELECT 1"; + return {query, ... await readOnlyClient.exec({ query })}; } diff --git a/src/clickhouse/query.ts b/src/clickhouse/query.ts new file mode 100644 index 0000000..565954b --- /dev/null +++ b/src/clickhouse/query.ts @@ -0,0 +1,26 @@ +import { QueryParams } from "@clickhouse/client-web"; +import { readOnlyClient } from "./createClient.js"; + +interface QueryResponseMeta { + name: string; + type: string; +} + +interface QueryResponseStatistics { + elapsed: number; + nows_read: number; + bytes_read: number; +} + +export interface QueryResponse { + meta: QueryResponseMeta[], + data: T[], + rows: number, + statistics: QueryResponseStatistics, +} + +export async function query(params: QueryParams) { + const response = await readOnlyClient.query(params); + const json = await response.json(); + return json as QueryResponse; +} \ No newline at end of file diff --git a/src/clickhouse/stores.ts b/src/clickhouse/stores.ts index 65d10ef..382006d 100644 --- a/src/clickhouse/stores.ts +++ b/src/clickhouse/stores.ts @@ -1,69 +1,52 @@ import { logger } from "../logger.js"; import { readOnlyClient } from "./createClient.js"; -class ClickhouseStore { - public paused = false; - - private chainsPromise: Promise | null = null; - private moduleHashesPromises: Promise | null = null; - - private knownTables = new Map(); - - public get chains() { - if (!this.chainsPromise) { - this.chainsPromise = readOnlyClient - .query({ query: "SELECT DISTINCT chain FROM module_hashes", format: "JSONEachRow" }) - .then((response) => response.json>()) - .then((chains) => chains.map(({ chain }) => chain)) - .catch(() => []); - } - - return this.chainsPromise; - } - - public get moduleHashes() { - if (!this.moduleHashesPromises) { - this.moduleHashesPromises = readOnlyClient - .query({ query: "SELECT DISTINCT module_hash from module_hashes", format: "JSONEachRow" }) - .then((response) => response.json>()) - .then((moduleHashes) => moduleHashes.map(({ module_hash }) => module_hash)) - .catch(() => []); - } - - return this.moduleHashesPromises; - } +export let chains: string[] | null = null; +export let module_hashes: string[] | null = null; +export let tables: Set | null = null; +export let paused = false; + +export function pause(value: boolean) { + paused = value; + logger.info('[pause]', `Paused=${paused}`); +} - // in memory TABLE name cache - // if true => true - // if false => false - // if undefined => check EXISTS if true or false - public async existsTable(table: string) { - // Return cached value if known (reduces number of EXISTS queries) - if (this.knownTables.has(table)) { - return this.knownTables.get(table); - } +export async function query_chains() { + if (chains) return chains; + chains = await readOnlyClient + .query({ query: "SELECT DISTINCT chain FROM module_hashes", format: "JSONEachRow" }) + .then((response) => response.json>()) + .then((chains) => chains.map(({ chain }) => chain)) + .catch(() => []); - // Check if table EXISTS - const response = await readOnlyClient.query({ - query: "EXISTS " + table, - format: "JSONEachRow", - }); + return chains; +} - // handle EXISTS response - const data = await response.json>(); - const exists = data[0]?.result === 1; - this.knownTables.set(table, exists); +export async function query_module_hashes() { + if (module_hashes) return module_hashes; + module_hashes = await readOnlyClient + .query({ query: "SELECT DISTINCT module_hash from module_hashes", format: "JSONEachRow" }) + .then((response) => response.json>()) + .then((moduleHashes) => moduleHashes.map(({ module_hash }) => module_hash)) + .catch(() => []); - logger.info('[existsTable]', `EXISTS [${table}=${exists}]`); - return exists; - } + return module_hashes; +} - public reset() { - this.chainsPromise = null; - this.moduleHashesPromises = null; - this.knownTables.clear(); - logger.info('[reset]', "Cache has been cleared"); - } +export function reset() { + chains = null; + module_hashes = null; + tables = null; + logger.info('[reset]', "Cache has been cleared"); } -export const store = new ClickhouseStore(); +export async function show_tables() { + const response = await readOnlyClient.query({ + query: "SHOW TABLES", + format: "JSONEachRow", + }); + const data = await response.json<{name: string}[]>(); + tables = new Set(data.map(({ name }) => name)); + logger.info('[show_tables]', `Loaded ${tables.size} tables (${[...tables].join(", ")})`); + return tables; +} diff --git a/src/clickhouse/table-initialization.ts b/src/clickhouse/table-initialization.ts index 4dec1cb..f290da8 100644 --- a/src/clickhouse/table-initialization.ts +++ b/src/clickhouse/table-initialization.ts @@ -1,25 +1,16 @@ import { logger } from "../logger.js"; import { Err, Ok, Result } from "../result.js"; -import client from "./createClient.js"; +import { client } from "./createClient.js"; import { augmentCreateTableStatement, getTableName, isCreateTableStatement } from "./table-utils.js"; -import tables from "./tables/tables.js"; +import tables from "../../sql/tables/tables.js"; -export async function initializeDefaultTables(): Promise { - const promiseResults = await Promise.allSettled( - tables.map(([table, query]) => { - logger.info('[initializeDefaultTables]', `CREATE TABLE [${table}]`); - return client.command({ query }); - }) - ); - - const rejectePromises = promiseResults.filter((promise) => promise.status === "rejected") as PromiseRejectedResult[]; - const reasons = rejectePromises.map((promise) => promise.reason); - - if (reasons.length > 0) { - return Err(new Error(reasons.join(" | "))); +export async function initializeDefaultTables() { + const results = []; + for ( const [ table, query ] of tables ) { + logger.info('[initializeDefaultTables]\t', `CREATE TABLE [${table}]`); + results.push({table, query, ...await client.exec({ query })}); } - - return Ok(); + return results; } const extraColumns = [ @@ -29,44 +20,44 @@ const extraColumns = [ "block_id FixedString(64)", "module_hash FixedString(40)", "timestamp DateTime64(3, 'UTC')", + "operation LowCardinality(String)", ]; -const alterations = (tableName: string) => { +const add_indexes = (tableName: string) => { return [ - `ALTER TABLE ${tableName} ADD INDEX timestamp_index timestamp TYPE minmax`, - `ALTER TABLE ${tableName} ADD INDEX block_number_index block_number TYPE minmax`, - `ALTER TABLE ${tableName} ADD INDEX chain_index chain TYPE minmax`, + `ALTER TABLE ${tableName} ADD INDEX IF NOT EXISTS timestamp_index timestamp TYPE minmax`, + `ALTER TABLE ${tableName} ADD INDEX IF NOT EXISTS block_number_index block_number TYPE minmax`, + `ALTER TABLE ${tableName} ADD INDEX IF NOT EXISTS chain_index chain TYPE minmax`, ]; }; -export async function executeCreateStatements(statements: string[]): Promise>> { +export async function executeCreateStatements(statements: string[]) { const executedStatements = []; logger.info('[executeCreateStatements]', `Executing ${statements.length} statement(s)`); - try { - for (const statement of statements) { - const tableName = getTableName(statement); - logger.info('[executeCreateStatements]', `Executing '${tableName}'`); + for (const statement of statements) { + const tableName = getTableName(statement); + logger.info('[executeCreateStatements]', `Executing '${tableName}'`); - if (!isCreateTableStatement(statement)) { - executedStatements.push(statement); - await client.command({ query: statement }); - continue; - } + // ignore non-create statements + if (!isCreateTableStatement(statement)) { + executedStatements.push(statement); + await client.exec({ query: statement }); + continue; + } - const augmentedStatement = augmentCreateTableStatement(statement, extraColumns); - executedStatements.push(augmentedStatement); + // ADD FIELDS + const augmentedStatement = augmentCreateTableStatement(statement, extraColumns); + executedStatements.push(augmentedStatement); + await client.exec({ query: augmentedStatement }); - await client.command({ query: augmentedStatement }); - for (const alteration of alterations(tableName)) { - await client.command({ query: alteration }); - } + // ADD INDEX + for (const add_index of add_indexes(tableName)) { + executedStatements.push(add_index); + await client.exec({ query: add_index }); } - } catch (err) { - logger.error('[executeCreateStatements]', "Could not execute the statements", "Request: " + executedStatements, err); - return Err(new Error(JSON.stringify(err))); } - + if ( executedStatements.length == 0 ) throw new Error("No statements executed"); logger.info('[executeCreateStatements]', "Complete."); - return Ok(executedStatements); + return executedStatements; } diff --git a/src/clickhouse/tables/blocks_mv.sql b/src/clickhouse/tables/blocks_mv.sql deleted file mode 100644 index 9d1cac7..0000000 --- a/src/clickhouse/tables/blocks_mv.sql +++ /dev/null @@ -1,22 +0,0 @@ --- view -- -CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_mv -ENGINE = MergeTree -ORDER BY (chain, timestamp, block_number) -AS SELECT - block_id, - block_number, - chain, - timestamp -FROM blocks; - --- DROP TABLE IF EXISTS blocks_mv; - --- OPTIMIZE TABLE blocks_mv FINAL; - --- -- insert -- --- INSERT INTO blocks_mv SELECT --- block_id, --- block_number, --- chain, --- timestamp --- FROM blocks; \ No newline at end of file diff --git a/src/clickhouse/tables/deleted_entity_changes.sql b/src/clickhouse/tables/deleted_entity_changes.sql deleted file mode 100644 index e602511..0000000 --- a/src/clickhouse/tables/deleted_entity_changes.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS deleted_entity_changes ( - id String, - chain LowCardinality(String), - source LowCardinality(String), - block_id FixedString(64), - block_number UInt32, - module_hash FixedString(40), - timestamp DateTime64(3, 'UTC'), -) -ENGINE = ReplacingMergeTree -ORDER BY (source, chain, block_number, timestamp, block_id); \ No newline at end of file diff --git a/src/clickhouse/tables/final_blocks.sql b/src/clickhouse/tables/final_blocks.sql deleted file mode 100644 index 534a99e..0000000 --- a/src/clickhouse/tables/final_blocks.sql +++ /dev/null @@ -1,5 +0,0 @@ -CREATE TABLE IF NOT EXISTS final_blocks ( - block_id FixedString(64), -) -ENGINE = ReplacingMergeTree -ORDER BY (block_id); \ No newline at end of file diff --git a/src/clickhouse/tables/unparsed_json.sql b/src/clickhouse/tables/unparsed_json.sql deleted file mode 100644 index 751602c..0000000 --- a/src/clickhouse/tables/unparsed_json.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE IF NOT EXISTS unparsed_json ( - raw_data String, - source LowCardinality(String), - id String, - block_id FixedString(64), - module_hash FixedString(40), - chain LowCardinality(String) -) -ENGINE = Null; \ No newline at end of file diff --git a/src/config.ts b/src/config.ts index f880a83..b6f5057 100644 --- a/src/config.ts +++ b/src/config.ts @@ -12,13 +12,6 @@ export const DEFAULT_HOST = "http://localhost:8123"; export const DEFAULT_DATABASE = "default"; export const DEFAULT_USERNAME = "default"; export const DEFAULT_PASSWORD = ""; -export const DEFAULT_ASYNC_INSERT = 1; -export const DEFAULT_WAIT_FOR_ASYNC_INSERT = 0; -export const DEFAULT_MAX_BUFFER_SIZE = 10_000; -export const DEFAULT_INSERTION_DELAY = 2000; -export const DEFAULT_ALLOW_UNPARSED = false; -export const DEFAULT_RESUME = true; -export const DEFAULT_BUFFER = "buffer.db"; export const APP_NAME = name; export const opts = program @@ -30,18 +23,10 @@ export const opts = program .addOption(new Option("-v, --verbose ", "Enable verbose logging").choices(["true", "false"]).env("VERBOSE").default(DEFAULT_VERBOSE)) .addOption(new Option("--hostname ", "Server listen on HTTP hostname").env("HOSTNAME").default(DEFAULT_HOSTNAME)) .addOption(new Option("--public-key ", "Comma separated list of public keys to validate messages").env("PUBLIC_KEY")) - .addOption(new Option("--auth-key ", "Auth key to validate requests").env("AUTH_KEY")) .addOption(new Option("--host ", "Database HTTP hostname").env("HOST").default(DEFAULT_HOST)) .addOption(new Option("--username ", "Database user").env("USERNAME").default(DEFAULT_USERNAME)) .addOption(new Option("--password ", "Password associated with the specified username").env("PASSWORD").default(DEFAULT_PASSWORD)) .addOption(new Option("--database ", "The database to use inside ClickHouse").env("DATABASE").default(DEFAULT_DATABASE)) - .addOption(new Option("--async-insert ", "https://clickhouse.com/docs/en/operations/settings/settings#async-insert").choices(["0", "1"]).env("ASYNC_INSERT").default(DEFAULT_ASYNC_INSERT)) - .addOption(new Option("--wait-for-async-insert ", "https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert").choices(["0", "1"]).env("WAIT_FOR_INSERT").default(DEFAULT_WAIT_FOR_ASYNC_INSERT)) - .addOption(new Option("--max-buffer-size ", "Maximum insertion batch size").env("MAX_BUFFER_SIZE").default(DEFAULT_MAX_BUFFER_SIZE)) - .addOption(new Option("--insertion-delay ", "Delay between batch insertions (in ms)").env("INSERTION_DELAY").default(DEFAULT_INSERTION_DELAY)) - .addOption(new Option("--allow-unparsed ", "Enable storage in 'unparsed_json' table").choices(["true", "false"]).env("ALLOW_UNPARSED").default(DEFAULT_ALLOW_UNPARSED)) - .addOption(new Option("--resume ", "Save the cached data from the previous process into ClickHouse").choices(["true", "false"]).env("RESUME").default(DEFAULT_RESUME)) - .addOption(new Option("--buffer ", "SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile.").env("BUFFER").default(DEFAULT_BUFFER)) .parse() .opts(); diff --git a/src/fetch/DELETE.ts b/src/fetch/DELETE.ts index 929caab..a523aeb 100644 --- a/src/fetch/DELETE.ts +++ b/src/fetch/DELETE.ts @@ -1,15 +1,9 @@ -import * as argon2 from "../auth/argon2.js"; -import { store } from "../clickhouse/stores.js"; +import * as store from "../clickhouse/stores.js"; import { NotFound } from "./cors.js"; export default async function (req: Request): Promise { const { pathname } = new URL(req.url); - const authResult = argon2.beforeHandle(req); - if (!authResult.success) { - return authResult.error; - } - if (pathname === "/caches") { store.reset(); return new Response(); diff --git a/src/fetch/GET.ts b/src/fetch/GET.ts index 7b0489c..a3d04e8 100644 --- a/src/fetch/GET.ts +++ b/src/fetch/GET.ts @@ -2,14 +2,14 @@ import { file } from "bun"; import swaggerFavicon from "../../swagger/favicon.png"; import swaggerHtml from "../../swagger/index.html"; import { metrics } from "../prometheus.js"; -import { blocks } from "./blocks.js"; +import { blocks } from "../../sql/blocks.js"; import { NotFound, toFile, toJSON, toText } from "./cors.js"; -import { findLatestCursor } from "./cursor.js"; +import { findLatestCursor } from "../../sql/cursor.js"; import health from "./health.js"; import { openapi } from "./openapi.js"; -import { cluster } from "./cluster.js"; +import { cluster } from "../../sql/cluster.js"; -export default async function (req: Request) { +export default async function (req: Request): Promise { const { pathname } = new URL(req.url); try { @@ -17,7 +17,7 @@ export default async function (req: Request) { if (pathname === "/favicon.png") return toFile(file(swaggerFavicon)); // queries - if (pathname === "/cursor/latest") return findLatestCursor(req); + if (pathname === "/cursor/latest") return toText(await findLatestCursor(req)); // health if (pathname === "/blocks") return toJSON(await blocks(req)); diff --git a/src/fetch/POST.ts b/src/fetch/POST.ts index 1c36f1b..69d8bdd 100644 --- a/src/fetch/POST.ts +++ b/src/fetch/POST.ts @@ -1,21 +1,13 @@ import { handleSinkRequest } from "../clickhouse/handleSinkRequest.js"; -import { store } from "../clickhouse/stores.js"; +import * as store from "../clickhouse/stores.js"; import { config } from "../config.js"; import { logger } from "../logger.js"; import * as prometheus from "../prometheus.js"; import { BodySchema } from "../schemas.js"; import { signatureEd25519 } from "../webhook/signatureEd25519.js"; import { toText } from "./cors.js"; -import hash from "./hash.js"; -import { query } from "./query.js"; export default async function (req: Request) { - const { pathname } = new URL(req.url); - - // queries - if (pathname === "/query") return query(req); - if (pathname === "/hash") return hash(req); - if (store.paused) { return toText("sink is paused", 400); } diff --git a/src/fetch/PUT.ts b/src/fetch/PUT.ts index 04d95ba..82711ae 100644 --- a/src/fetch/PUT.ts +++ b/src/fetch/PUT.ts @@ -1,5 +1,4 @@ -import * as argon2 from "../auth/argon2.js"; -import { NotFound } from "./cors.js"; +import { NotFound, toText } from "./cors.js"; import init from "./init.js"; import { handlePause } from "./pause.js"; import { handleSchemaRequest } from "./schema.js"; @@ -7,16 +6,16 @@ import { handleSchemaRequest } from "./schema.js"; export default async function (req: Request): Promise { const { pathname } = new URL(req.url); - const authResult = argon2.beforeHandle(req); - if (!authResult.success) { - return authResult.error; + try { + if (pathname === "/init") return await init(); + if (pathname === "/schema/sql") return handleSchemaRequest(req, "sql"); + if (pathname === "/schema/graphql") return handleSchemaRequest(req, "graphql"); + if (pathname === "/pause") return handlePause(true); + if (pathname === "/unpause") return handlePause(false); + } catch (e) { + console.error(e); + return toText(String(e), 500); } - if (pathname === "/init") return init(); - if (pathname === "/schema/sql") return handleSchemaRequest(req, "sql"); - if (pathname === "/schema/graphql") return handleSchemaRequest(req, "graphql"); - if (pathname === "/pause") return handlePause(true); - if (pathname === "/unpause") return handlePause(false); - return NotFound; } diff --git a/src/fetch/cursor.ts b/src/fetch/cursor.ts deleted file mode 100644 index 08ba8fa..0000000 --- a/src/fetch/cursor.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { readOnlyClient } from "../clickhouse/createClient.js"; -import { store } from "../clickhouse/stores.js"; -import { logger } from "../logger.js"; -import { Err, Ok, Result } from "../result.js"; -import { BadRequest, toText } from "./cors.js"; - -export async function findLatestCursor(req: Request): Promise { - const parametersResult = await verifyParameters(req); - if (!parametersResult.success) { - return parametersResult.error; - } - - try { - const { moduleHash, chain } = parametersResult.payload; - - const query = ` - SELECT latest_cursor - FROM module_hashes - WHERE chain = '${chain}' AND module_hash = '${moduleHash}' - LIMIT 1`; - - const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); - const data = await response.json>(); - - if (data.length === 1) { - return toText(data[0].latest_cursor); - } - - return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400); - } catch (err) { - logger.error('[findLatestCursor]', err); - } - return BadRequest; -} - -async function verifyParameters(req: Request): Promise> { - const url = new URL(req.url); - const chain = url.searchParams.get("chain"); - const moduleHash = url.searchParams.get("module_hash"); - - if (!chain) { - return Err(toText("Missing parameter: chain", 400)); - } - - if (!moduleHash) { - return Err(toText("Missing parameter: module_hash", 400)); - } - - if (!(await store.chains).includes(chain)) { - store.reset(); - return Err(toText("Invalid parameter: chain=" + chain, 400)); - } - - if (!(await store.moduleHashes).includes(moduleHash)) { - store.reset(); - return Err(toText("Invalid parameter: moduleHash=" + moduleHash, 400)); - } - - return Ok({ chain, moduleHash }); -} diff --git a/src/fetch/hash.ts b/src/fetch/hash.ts deleted file mode 100644 index 5eb81e6..0000000 --- a/src/fetch/hash.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { toText } from "./cors.js"; - -export default async function hash(req: Request): Promise { - const password = await req.text(); - const hash = Bun.password.hashSync(password); - const escapedHash = hash.replaceAll("$", "\\$"); - return toText(escapedHash); -} diff --git a/src/fetch/health.ts b/src/fetch/health.ts index 018fc8c..8845730 100644 --- a/src/fetch/health.ts +++ b/src/fetch/health.ts @@ -9,19 +9,21 @@ function now() { let timestamp = now(); let cachedHealthValue = true; -export default async function (): Promise { +export default async function health(): Promise { if (now() - timestamp < 1) { return cachedHealthValue ? toText("OK") : BadRequest; } - const pingResult = await ping(); - timestamp = now(); + // success + try { + await ping(); + timestamp = now(); + cachedHealthValue = true; + return toText("OK"); - if (!pingResult.success) { + // failure + } catch (e) { cachedHealthValue = false; return BadRequest; } - - cachedHealthValue = true; - return toText("OK"); } diff --git a/src/fetch/init.ts b/src/fetch/init.ts index 4a027d6..78df3d6 100644 --- a/src/fetch/init.ts +++ b/src/fetch/init.ts @@ -2,24 +2,12 @@ import { createDatabase } from "../clickhouse/createDatabase.js"; import { ping } from "../clickhouse/ping.js"; import { initializeDefaultTables } from "../clickhouse/table-initialization.js"; import { config } from "../config.js"; -import { logger } from "../logger.js"; -import { BadRequest, toText } from "./cors.js"; - -export default async function () { - const initializationSteps = [ - { step: ping, failureMessage: "Ping request failed" }, - { step: () => createDatabase(config.database), failureMessage: "Create database failed" }, - { step: initializeDefaultTables, failureMessage: "Initialize default tables failed" }, - ]; - - for (const { step, failureMessage } of initializationSteps) { - const result = await step(); - - if (!result.success) { - logger.error('[init]', `${failureMessage} | ${result.error}`); - return BadRequest; - } - } - - return toText("OK"); +import { toJSON } from "./cors.js"; + +export default async function init() { + return toJSON({ + ping: await ping(), + createDatabase: await createDatabase(config.database), + initializeDefaultTables: await initializeDefaultTables(), + }); } diff --git a/src/fetch/openapi.ts b/src/fetch/openapi.ts index 27e7cc1..4d41d54 100644 --- a/src/fetch/openapi.ts +++ b/src/fetch/openapi.ts @@ -4,10 +4,10 @@ import { LicenseObject } from "openapi3-ts/oas30"; import { OpenApiBuilder, ParameterObject, ResponsesObject, SchemaObject } from "openapi3-ts/oas31"; import { z } from "zod"; import * as ztjs from "zod-to-json-schema"; -import { store } from "../clickhouse/stores.js"; +import * as store from "../clickhouse/stores.js"; import { BodySchema } from "../schemas.js"; -import { BlockResponseSchema } from "./blocks.js"; -import { ClusterSchema } from "./cluster.js"; +import { BlockResponseSchema } from "../../sql/blocks.js"; +import { ClusterSchema } from "../../sql/cluster.js"; const zodToJsonSchema = (...params: Parameters<(typeof ztjs)["zodToJsonSchema"]>) => ztjs.zodToJsonSchema(...params) as SchemaObject; @@ -42,7 +42,7 @@ async function paramChain(required = true): Promise { name: "chain", in: "query", required, - schema: { enum: await store.chains }, + schema: { enum: await store.query_chains() }, }; } @@ -51,7 +51,7 @@ async function paramModuleHash(required = true): Promise { name: "module_hash", in: "query", required: true, - schema: { enum: await store.moduleHashes }, + schema: { enum: await store.query_module_hashes() }, }; } diff --git a/src/fetch/pause.ts b/src/fetch/pause.ts index 91e9960..f7638be 100644 --- a/src/fetch/pause.ts +++ b/src/fetch/pause.ts @@ -1,9 +1,9 @@ -import { store } from "../clickhouse/stores.js"; +import * as store from "../clickhouse/stores.js"; import { logger } from "../logger.js"; import { toText } from "./cors.js"; export function handlePause(targetValue: boolean): Response { - store.paused = targetValue; + store.pause(targetValue) logger.info('[handlePause]', "Sink is now paused: " + store.paused); return toText("OK"); } diff --git a/src/fetch/query.ts b/src/fetch/query.ts deleted file mode 100644 index 9a73c8a..0000000 --- a/src/fetch/query.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { readOnlyClient } from "../clickhouse/createClient.js"; -import { logger } from "../logger.js"; -import { BadRequest, toJSON } from "./cors.js"; - -export async function query(req: Request): Promise { - try { - const query = await req.text(); - const result = await readOnlyClient.query({ query, format: "JSONEachRow" }); - const data = await result.json(); - - return toJSON(data); - } catch (err) { - logger.error('[query]', err); - return BadRequest; - } -} diff --git a/src/fetch/schema.ts b/src/fetch/schema.ts index 2d0bd87..f9b5578 100644 --- a/src/fetch/schema.ts +++ b/src/fetch/schema.ts @@ -1,4 +1,4 @@ -import { store } from "../clickhouse/stores.js"; +import * as store from "../clickhouse/stores.js"; import { executeCreateStatements } from "../clickhouse/table-initialization.js"; import { splitCreateStatement } from "../clickhouse/table-utils.js"; import { ClickhouseTableBuilder } from "../graphql/builders/clickhouse-table-builder.js"; @@ -25,13 +25,14 @@ export async function handleSchemaRequest(req: Request, type: "sql" | "graphql") logger.info('[handleSchemaRequest]', `Found ${statements.length} statement(s)`); - const executedSchemas = await executeCreateStatements(statements); - if (!executedSchemas.success) { - return toText("Could not execute the statements", 500); + try { + const executedSchemas = await executeCreateStatements(statements); + store.reset(); + return toText(executedSchemas.join("\n")); + } catch (e) { + logger.error('[handleSchemaRequest]', e); + return toText(String(e), 500); } - - store.reset(); - return toJSON({ status: "OK", schema: executedSchemas.payload.join("\n") }); } // This looks for a table schema in the request object. diff --git a/src/logger.ts b/src/logger.ts index 1fb175c..e116ee2 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -3,7 +3,9 @@ import { name } from "../package.json" assert { type: "json" }; class SinkLogger extends Logger { constructor() { - super(); + super({ + prettyLogTemplate: "{{yyyy}}.{{mm}}.{{dd}} {{hh}}:{{MM}}:{{ss}}\t{{logLevelName}}\t{{name}}\t" + }); this.disable(); this.settings.name = name; } @@ -30,3 +32,5 @@ class SinkLogger extends Logger { } export const logger = new SinkLogger(); + +logger.settings diff --git a/src/prometheus.ts b/src/prometheus.ts index 2a2bbb9..0f28785 100644 --- a/src/prometheus.ts +++ b/src/prometheus.ts @@ -26,7 +26,4 @@ export const requests = registerCounter("requests", "Total requests")!; export const request_errors = registerCounter("request_errors", "Total failed requests")!; export const sink_requests = registerCounter("sink_requests", "Total sink requests", ["chain", "module_hash"])!; -export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes", ["chain", "module_hash"])!; -export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes", ["chain", "module_hash"])!; -export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes", ["chain", "module_hash"])!; -export const entity_changes_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes", ["chain", "module_hash"])!; +export const entity_changes = registerCounter("entity_changes", "Total Entity Changes", ["chain", "module_hash", "operation"])!; diff --git a/src/resume.ts b/src/resume.ts deleted file mode 100644 index 3254428..0000000 --- a/src/resume.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { saveKnownEntityChanges } from "./clickhouse/handleSinkRequest.js"; -import { ping } from "./clickhouse/ping.js"; -import { logger } from "./logger.js"; - -export async function resume() { - const pingResult = await ping(); - if (!pingResult.success) { - logger.error("[resume]", "Error: " + pingResult.error.message); - return; - } - - logger.info("[resume]", "writing unsinked data to ClickHouse..."); - const saveResult = await saveKnownEntityChanges(); - if (!saveResult.success) { - logger.error("[resume]", "Error: " + saveResult.error.message); - return; - } - logger.info("[resume]", "completed"); -} diff --git a/src/schemas.spec.ts b/src/schemas.spec.ts index 8860eb4..87fb9b2 100644 --- a/src/schemas.spec.ts +++ b/src/schemas.spec.ts @@ -16,13 +16,6 @@ const config = ConfigSchema.parse({ username: "default", password: "", verbose: "true", - waitForAsyncInsert: "0", - asyncInsert: "1", - maxBufferSize: "10000", - insertionDelay: "2000", - allowUnparsed: true, - resume: true, - buffer: "buffer.sqlite", }); describe("ConfigSchema", () => { @@ -33,9 +26,4 @@ describe("ConfigSchema", () => { test("username", () => expect(config.username).toBe("default")); test("publicKey", () => expect(config.publicKey).toEqual(["a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"])); - test("waitForAsyncInsert", () => expect(config.waitForAsyncInsert).toBe(0)); - test("asyncInsert", () => expect(config.asyncInsert).toBe(1)); - test("allowUnparsed", () => expect(config.allowUnparsed).toBeTrue()); - test("resume", () => expect(config.resume).toBeTrue()); - test("buffer", () => expect(config.buffer).toBe("buffer.sqlite")); }); diff --git a/src/schemas.ts b/src/schemas.ts index df9e35b..a6f6c58 100644 --- a/src/schemas.ts +++ b/src/schemas.ts @@ -14,8 +14,7 @@ export const ConfigSchema = z.object({ publicKey: z.optional( z.string() .transform((str) => str.split(",")) - .refine((publicKeys) => publicKeys.filter((key) => key.length > 0).length > 0, "No primary key has been set")), - authKey: z.optional(z.string().transform((str) => str.replaceAll("\\$", "$"))), + .refine((keys) => keys.filter((key) => key.length > 0).length > 0, "No primary key has been set")), port: positiveNumber, verbose: boolean, host: z.string(), @@ -23,13 +22,6 @@ export const ConfigSchema = z.object({ database: z.string(), username: z.string(), password: z.string(), - asyncInsert: oneOrZero, - waitForAsyncInsert: oneOrZero, - maxBufferSize: positiveNumber, - insertionDelay: positiveNumber, - allowUnparsed: boolean, - resume: boolean, - buffer: z.string(), }); export type ConfigSchema = z.infer; export const TableInitSchema = z.string(); diff --git a/src/sqlite/sqlite.ts b/src/sqlite/sqlite.ts deleted file mode 100644 index d1b33ea..0000000 --- a/src/sqlite/sqlite.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { file } from "bun"; -import Database, { Statement } from "bun:sqlite"; -import { Clock, Manifest } from "substreams-sink-webhook/schemas"; -import { config } from "../config.js"; -import { Ok, Result, UnknownErr } from "../result.js"; -import tableSQL from "./table.sql"; - -const selectSQL = { - blocks: "SELECT block_id, block_number, chain, timestamp FROM data_buffer WHERE batch_number <= ?;", - finalBlocks: "SELECT block_id FROM data_buffer WHERE batch_number <= ? AND is_final = 1;", - moduleHashes: `SELECT module_hash, module_name, chain, type, cursor AS latest_cursor, block_number AS latest_block_number, block_id AS latest_block_id, timestamp AS latest_timestamp - FROM data_buffer WHERE batch_number <= ?;`, - sources: "SELECT DISTINCT source FROM data_buffer WHERE batch_number <= ?;", - entityChanges: "SELECT entity_changes FROM data_buffer WHERE batch_number <= ? AND source = ?", -}; - -const deleteSQL = `DELETE FROM data_buffer WHERE batch_number < ?;`; -const insertSQL = ` -INSERT INTO data_buffer ( - batch_number, - entity_changes, source, - chain, block_id, block_number, is_final, - module_hash, module_name, type, - timestamp, cursor -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`; - -const tableSchema = await file(tableSQL).text(); - -class SQLite { - private db: Database; - private batchNumber; - - private selectBlocksStatement: Statement; - private selectFinalBlocksStatement: Statement; - private selectModuleHashesStatement: Statement; - private selectSourcesStatement: Statement<{ source: string }, [number]>; - private selecEntityChangesStatement: Statement<{ entity_changes: string }, [number, string]>; - - private deleteStatement: Statement; - private insertStatement: Statement; - - public constructor() { - this.db = new Database(config.buffer); - this.db.run("PRAGMA synchronous = OFF;"); - this.db.run("PRAGMA journal_mode = MEMORY;"); - - this.db.run(tableSchema); - this.batchNumber = this.initialBatchNumber; - - this.selectBlocksStatement = this.db.prepare(selectSQL.blocks); - this.selectFinalBlocksStatement = this.db.prepare(selectSQL.finalBlocks); - this.selectModuleHashesStatement = this.db.prepare(selectSQL.moduleHashes); - this.selectSourcesStatement = this.db.prepare(selectSQL.sources); - this.selecEntityChangesStatement = this.db.prepare(selectSQL.entityChanges); - - this.deleteStatement = this.db.prepare(deleteSQL); - this.insertStatement = this.db.prepare(insertSQL); - } - - public insert(entityChanges: string, source: string, clock: Clock, manifest: Manifest, cursor: string) { - const { chain, finalBlockOnly, moduleHash, moduleName, type } = manifest; - const { id: blockId, number: blockNumber, timestamp: timestampStr } = clock; - - const isFinal = finalBlockOnly ? 1 : 0; - const timestamp = Number(new Date(timestampStr)); - - const args = [source, chain, blockId, blockNumber, isFinal, moduleHash, moduleName, type, timestamp, cursor]; - this.insertStatement.run(this.batchNumber, entityChanges, ...args); - } - - public async commitBuffer( - onData: ( - blocks: unknown[], - finalBlocks: unknown[], - moduleHashes: unknown[], - entityChanges: Record - ) => Promise - ): Promise { - try { - this.batchNumber++; - - const blocks = this.selectBlocksStatement.all(this.batchNumber); - const finalBlocks = this.selectFinalBlocksStatement.all(this.batchNumber); - const moduleHashes = this.selectModuleHashesStatement.all(this.batchNumber); - const entityChanges: Record> = {}; - - const sources = this.selectSourcesStatement.all(this.batchNumber); - for (const { source } of sources) { - if (source.length > 0) { - entityChanges[source] = this.selecEntityChangesStatement - .all(this.batchNumber, source) - .map((response) => JSON.parse(response.entity_changes)); - } - } - - await onData(blocks, finalBlocks, moduleHashes, entityChanges); - this.deleteStatement.run(this.batchNumber); - } catch (err) { - return UnknownErr(err); - } - - return Ok(); - } - - private get initialBatchNumber() { - try { - const sql = `SELECT MAX(batch_number) AS batch_number - FROM ( - SELECT batch_number FROM data_buffer - UNION ALL - SELECT 0 AS batch_number - )`; - - const response = this.db.query<{ batch_number: number }, any>(sql).get(); - return response!.batch_number + 1; - } catch { - return 0; - } - } -} - -export const sqlite = new SQLite(); diff --git a/src/sqlite/table.sql b/src/sqlite/table.sql deleted file mode 100644 index 54a2ea9..0000000 --- a/src/sqlite/table.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE TABLE IF NOT EXISTS data_buffer ( - batch_number INTEGER, - - entity_changes TEXT, - source TEXT, - - chain TEXT, - block_id TEXT, - block_number INTEGER, - is_final INTEGER, - - module_hash TEXT, - module_name TEXT, - type TEXT, - - timestamp INTEGER, - cursor TEXT -); \ No newline at end of file