diff --git a/.env.example b/.env.example index 3e3d10d..f14d187 100644 --- a/.env.example +++ b/.env.example @@ -11,10 +11,8 @@ HOST=http://127.0.0.1:8123 DATABASE=default USERNAME=default PASSWORD= -CREATE_DB=false # Sink QUEUE_LIMIT=10 QUEUE_CONCURRENCY=10 -SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql) VERBOSE=true \ No newline at end of file diff --git a/README.md b/README.md index e11b8b8..c0e3dd0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,57 @@ ## Features -- TO-DO: See [issues](https://github.com/pinax-network/substreams-sink-clickhouse/issues?q=is%3Aissue+is%3Aclosed) +
+Serverless data sinking + +By using this sink with [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook), data from any substreams is available in ClickHouse easily. + +
+ +
+Automatic block information + +Data for each block is stored alongside every record. The fields and their structure can be found in the [database structure](#database-structure). + +
+ +
+SQL schemas + +A schema can be passed in to define the end table for substreams data. It will be extended as described in the [database structure](#database-structure). + +They can be set according to the steps in [database initialization](#database-initialization). + +
+ +
+GraphQL schemas + +[TheGraph's GraphQL entity](https://thegraph.com/docs/en/developing/creating-a-subgraph/#defining-entities) schemas can be passed in to define the end table for substreams data. See [database initialization](#database-initialization). + +They are converted to SQL following these rules before being executed. The available types are defined [here](https://thegraph.com/docs/en/developing/creating-a-subgraph/#graphql-supported-scalars). + +| GraphQL data type | ClickHouse equivalent | +| ----------------- | --------------------- | +| `Bytes` | `String` | +| `String` | `String` | +| `Boolean` | `boolean` | +| `Int` | `Int32` | +| `BigInt` | `String` | +| `BigDecimal` | `String` | +| `Float` | `Float64` | +| `ID` | `String` | + +
+ +
+NO schema + +No schema is required to store data in ClickHouse. Everything can be stored in `unparsed_json` (see [database structure](#database-structure)). + +The user **must** build custom [views](https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views) to transform the data according to their needs. Further details are available in [ClickHouse's documentation](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views). + +
## Usage @@ -23,14 +73,12 @@ Options: -V, --version output the version number -p, --port HTTP port on which to attach the sink (default: "3000", env: PORT) -v, --verbose Enable verbose logging (choices: "true", "false", default: "pretty", env: VERBOSE) - -s, --schema-url Execute SQL instructions before starting the sink (env: SCHEMA_URL) --public-key Public key to validate messages (env: PUBLIC_KEY) --auth-key Auth key to validate requests (env: AUTH_KEY) --host Database HTTP hostname (default: "http://localhost:8123", env: HOST) --database The database to use inside ClickHouse (default: "default", env: DATABASE) --username Database user (default: "default", env: USERNAME) --password Password associated with the specified username (default: "", env: PASSWORD) - --create-database https://clickhouse.com/docs/en/operations/settings/settings#async-insert (choices: "0", "1", default: 1, env: ASYNC_INSERT) --wait-for-insert https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert (choices: "0", "1", default: 0, env: WAIT_FOR_INSERT) --queue-limit Insert delay to each response when the pqueue exceeds this value (default: 10, env: QUEUE_LIMIT) @@ -40,9 +88,10 @@ Options: ### Environment variables -These can all be set when starting the sink. See [cli structure](#cli-structure). +These can all be set when starting the sink. See [usage](#usage). **.env** + ```bash # Authentication PUBLIC_KEY=... # Ed25519 Public-key provided by https://github.com/pinax-network/substreams-sink-webhook @@ -57,12 +106,10 @@ HOST=http://127.0.0.1:8123 DATABASE=default USERNAME=default PASSWORD= -CREATE_DB=false # Sink QUEUE_LIMIT=10 QUEUE_CONCURRENCY=10 -SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql) VERBOSE=true ``` @@ -72,11 +119,13 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b ```mermaid erDiagram - USER_DIMENSION }|--|{ block : " " - USER_DIMENSION }|--|{ manifest : " " + USER_DIMENSION }|--|{ blocks : " " + USER_DIMENSION }|--|{ module_hashes : " " - block }|--|{ unparsed_json : " " - manifest }|--|{ unparsed_json : " " + blocks }|--|{ unparsed_json : " " + module_hashes }|--|{ unparsed_json : " " + + blocks }|--|{ final_blocks : " " USER_DIMENSION { user_data unknown @@ -95,64 +144,69 @@ erDiagram chain LowCardinality(String) } - block { + blocks { block_id FixedString(64) - block_number UInt32() + block_number UInt32 chain LowCardinality(String) timestamp DateTime64(3_UTC) - final_block Bool } - manifest { + module_hashes { module_hash FixedString(40) - module_name String() + module_name String chain LowCardinality(String) - type String() + type String + } + + final_blocks { + block_id FixedString(64) } ``` **Indexes** -| Table | Fields | -| -------------- | ------------------------------------------ | -| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` | -| block | `(block_id, block_number, timestamp)` | -| manifest | `module_hash` | +| Table | Fields | +| -------------- | -------------------------------------------- | +| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` | +| module_hashes | `module_hash` | +| blocks | `(block_id, block_number, chain, timestamp)` | +| unparsed_json | `(source, chain, module_hash, block_id)` | +| final_blocks | `block_id` | ### Database initialization -Create a database in ClickHouse. (Optionally, skip this step and use the `default` database.) +Create a database in ClickHouse and setup the dimension tables. + +Use `POST /init` on [http://localhost:3000](http://localhost:3000). ```bash -substreams-sink-clickhouse --create-db --name +> curl --location --request PUT 'http://localhost:3000/init' --header 'Authorization: Bearer ``` ### Schema initialization -_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools (eg: `MaterializedView`)_ +_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools. See this [article](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views)._ -Initializes the database according to a SQL file. See [example file](#example-sql-file). - -**CLI** - -``` -substreams-sink-clickhouse --schema-url -``` +Initializes the database according to a SQL or a GraphQL file. See [example schema files](#schema-examples). **Web UI** -Upload a `.sql` file on [http://localhost:3000](http://localhost:3000). (POST request `/schema`, Content-Type: `application/octet-stream`) +Upload a schema file on [http://localhost:3000](http://localhost:3000). + +_Use PUT `/schema/sql` or PUT `/schema/graphql` with `Content-Type: application/octet-stream`._ **Curl** ```bash -curl --location --request POST 'http://localhost:3000/schema' --header 'Authorization: Bearer ' --header 'Content-Type: application/json' --data-raw '' +> curl --location --request PUT 'http://localhost:3000/schema/sql' --header 'Authorization: Bearer ' --header 'Content-Type: application/json' --data-raw '' + +> curl --location --request PUT 'http://localhost:3000/schema/graphql' --header 'Authorization: Bearer ' --header 'Content-Type: application/json' --data-raw '' ``` -#### Example SQL file +### Schema examples
-Click to expand +Example SQL file ```sql CREATE TABLE IF NOT EXISTS contracts ( @@ -167,6 +221,21 @@ ORDER BY (address)
+
+Example GraphQL file + +```graphql +type Contracts @entity { + id: ID! + address: String! + name: String + symbol: String + decimals: BigInt +} +``` + +
+ ### Sink Serves an endpoint to receive Substreams data from [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook). diff --git a/src/config.ts b/src/config.ts index 2fd4f3a..b09139a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -13,7 +13,6 @@ export const DEFAULT_HOST = "http://localhost:8123"; export const DEFAULT_DATABASE = "default"; export const DEFAULT_USERNAME = "default"; export const DEFAULT_PASSWORD = ""; -export const DEFAULT_CREATE_DATABASE = "false"; export const DEFAULT_ASYNC_INSERT = 1; export const DEFAULT_WAIT_FOR_ASYNC_INSERT = 0; export const DEFAULT_QUEUE_LIMIT = 10; @@ -29,14 +28,12 @@ export const opts = program .addOption(new Option("-p, --port ", "HTTP port on which to attach the sink").env("PORT").default(DEFAULT_PORT)) .addOption(new Option("-v, --verbose ", "Enable verbose logging").choices(["true", "false"]).env("VERBOSE").default(DEFAULT_VERBOSE)) .addOption(new Option("--hostname ", "Server listen on HTTP hostname").env("HOSTNAME").default(DEFAULT_HOSTNAME)) - .addOption(new Option("-s, --schema-url ", "Execute SQL instructions before starting the sink").env("SCHEMA_URL").preset(DEFAULT_SCHEMA_URL)) .addOption(new Option("--public-key ", "Public key to validate messages").env("PUBLIC_KEY")) .addOption(new Option("--auth-key ", "Auth key to validate requests").env("AUTH_KEY")) .addOption(new Option("--host ", "Database HTTP hostname").env("HOST").default(DEFAULT_HOST)) .addOption(new Option("--username ", "Database user").env("USERNAME").default(DEFAULT_USERNAME)) .addOption(new Option("--password ", "Password associated with the specified username").env("PASSWORD").default(DEFAULT_PASSWORD)) .addOption(new Option("--database ", "The database to use inside ClickHouse").env("DATABASE").default(DEFAULT_DATABASE)) - .addOption(new Option("--create-database ", "If the specified database does not exist, automatically create it").env("CREATE_DATABASE").default(DEFAULT_CREATE_DATABASE)) .addOption(new Option("--async-insert ", "https://clickhouse.com/docs/en/operations/settings/settings#async-insert").choices(["0", "1"]).env("ASYNC_INSERT").default(DEFAULT_ASYNC_INSERT)) .addOption(new Option("--wait-for-async-insert ", "https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert").choices(["0", "1"]).env("WAIT_FOR_INSERT").default(DEFAULT_WAIT_FOR_ASYNC_INSERT)) .addOption(new Option("--queue-limit ","Insert delay to each response when the pqueue exceeds this value").env("QUEUE_LIMIT").default(DEFAULT_QUEUE_LIMIT)) diff --git a/src/schemas.spec.ts b/src/schemas.spec.ts index 9a08604..1b9bf7c 100644 --- a/src/schemas.spec.ts +++ b/src/schemas.spec.ts @@ -12,11 +12,9 @@ const config = ConfigSchema.parse({ hostname: "0.0.0.0", publicKey: "a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9", host: "http://127.0.0.1:8123", - schemaUrl: "./schema.sql", database: "default", username: "default", password: "", - createDb: "false", queueLimit: "10", queueConcurrency: "10", verbose: "true", @@ -30,11 +28,9 @@ describe("ConfigSchema", () => { test("port", () => expect(config.port).toBe(3000)); test("queueLimit", () => expect(config.queueLimit).toBe(10)); test("verbose", () => expect(config.verbose).toBe(true)); - test("schemaUrl", () => expect(config.schemaUrl).toBe("./schema.sql")); test("database", () => expect(config.database).toBe("default")); test("username", () => expect(config.username).toBe("default")); test("publicKey", () => expect(config.publicKey).toBe("a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9")); test("waitForAsyncInsert", () => expect(config.waitForAsyncInsert).toBe(0)); test("asyncInsert", () => expect(config.asyncInsert).toBe(1)); - test("createDatabase", () => expect(config.createDatabase).toBe(false)); }); diff --git a/src/schemas.ts b/src/schemas.ts index 174aa06..3da789f 100644 --- a/src/schemas.ts +++ b/src/schemas.ts @@ -18,12 +18,10 @@ export const ConfigSchema = z.object({ database: z.string(), username: z.string(), password: z.string(), - createDatabase: boolean, asyncInsert: oneOrZero, waitForAsyncInsert: oneOrZero, queueLimit: positiveNumber, queueConcurrency: positiveNumber, - schemaUrl: z.optional(z.string()), }); export type ConfigSchema = z.infer;