Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

README update #42

Merged
merged 7 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ HOST=http://127.0.0.1:8123
DATABASE=default
USERNAME=default
PASSWORD=
CREATE_DB=false

# Sink
QUEUE_LIMIT=10
QUEUE_CONCURRENCY=10
SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql)
VERBOSE=true
139 changes: 104 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,57 @@

## Features

- TO-DO: See [issues](https://github.com/pinax-network/substreams-sink-clickhouse/issues?q=is%3Aissue+is%3Aclosed)
<details>
<summary><b>Serverless data sinking</b></summary>

By using this sink with [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook), data from any substreams is available in ClickHouse easily.

</details>

<details>
<summary><b>Automatic block information</b></summary>

Data for each block is stored alongside every record. The fields and their structure can be found in the [database structure](#database-structure).

</details>

<details>
<summary><b>SQL schemas</b></summary>

A schema can be passed in to define the end table for substreams data. It will be extended as described in the [database structure](#database-structure).

They can be set according to the steps in [database initialization](#database-initialization).

</details>

<details>
<summary><b>GraphQL schemas</b></summary>

[TheGraph's GraphQL entity](https://thegraph.com/docs/en/developing/creating-a-subgraph/#defining-entities) schemas can be passed in to define the end table for substreams data. See [database initialization](#database-initialization).

They are converted to SQL following these rules before being executed. The available types are defined [here](https://thegraph.com/docs/en/developing/creating-a-subgraph/#graphql-supported-scalars).

| GraphQL data type | ClickHouse equivalent |
| ----------------- | --------------------- |
| `Bytes` | `String` |
| `String` | `String` |
| `Boolean` | `boolean` |
| `Int` | `Int32` |
| `BigInt` | `String` |
| `BigDecimal` | `String` |
| `Float` | `Float64` |
| `ID` | `String` |

</details>

<details>
<summary><b>NO schema</b></summary>

No schema is required to store data in ClickHouse. Everything can be stored in `unparsed_json` (see [database structure](#database-structure)).

The user **must** build custom [views](https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views) to transform the data according to their needs. Further details are available in [ClickHouse's documentation](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views).

</details>

## Usage

Expand All @@ -23,14 +73,12 @@ Options:
-V, --version output the version number
-p, --port <number> HTTP port on which to attach the sink (default: "3000", env: PORT)
-v, --verbose <boolean> Enable verbose logging (choices: "true", "false", default: "pretty", env: VERBOSE)
-s, --schema-url <string> Execute SQL instructions before starting the sink (env: SCHEMA_URL)
--public-key <string> Public key to validate messages (env: PUBLIC_KEY)
--auth-key <string> Auth key to validate requests (env: AUTH_KEY)
--host <string> Database HTTP hostname (default: "http://localhost:8123", env: HOST)
--database <string> The database to use inside ClickHouse (default: "default", env: DATABASE)
--username <string> Database user (default: "default", env: USERNAME)
--password <string> Password associated with the specified username (default: "", env: PASSWORD)
--create-database <boolean If the specified database does not exist, automatically create it (default: "false", env: CREATE_DATABASE)
--async-insert <number> https://clickhouse.com/docs/en/operations/settings/settings#async-insert (choices: "0", "1", default: 1, env: ASYNC_INSERT)
--wait-for-insert <boolean> https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert (choices: "0", "1", default: 0, env: WAIT_FOR_INSERT)
--queue-limit <number> Insert delay to each response when the pqueue exceeds this value (default: 10, env: QUEUE_LIMIT)
Expand All @@ -40,9 +88,10 @@ Options:

### Environment variables

These can all be set when starting the sink. See [cli structure](#cli-structure).
These can all be set when starting the sink. See [usage](#usage).

**.env**

```bash
# Authentication
PUBLIC_KEY=... # Ed25519 Public-key provided by https://github.com/pinax-network/substreams-sink-webhook
Expand All @@ -57,12 +106,10 @@ HOST=http://127.0.0.1:8123
DATABASE=default
USERNAME=default
PASSWORD=
CREATE_DB=false

# Sink
QUEUE_LIMIT=10
QUEUE_CONCURRENCY=10
SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql)
VERBOSE=true
```

Expand All @@ -72,11 +119,13 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b

```mermaid
erDiagram
USER_DIMENSION }|--|{ block : " "
USER_DIMENSION }|--|{ manifest : " "
USER_DIMENSION }|--|{ blocks : " "
USER_DIMENSION }|--|{ module_hashes : " "

block }|--|{ unparsed_json : " "
manifest }|--|{ unparsed_json : " "
blocks }|--|{ unparsed_json : " "
module_hashes }|--|{ unparsed_json : " "

blocks }|--|{ final_blocks : " "

USER_DIMENSION {
user_data unknown
Expand All @@ -95,64 +144,69 @@ erDiagram
chain LowCardinality(String)
}

block {
blocks {
block_id FixedString(64)
block_number UInt32()
block_number UInt32
chain LowCardinality(String)
timestamp DateTime64(3_UTC)
final_block Bool
}

manifest {
module_hashes {
module_hash FixedString(40)
module_name String()
module_name String
chain LowCardinality(String)
type String()
type String
}

final_blocks {
block_id FixedString(64)
}
```

**Indexes**

| Table | Fields |
| -------------- | ------------------------------------------ |
| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` |
| block | `(block_id, block_number, timestamp)` |
| manifest | `module_hash` |
| Table | Fields |
| -------------- | -------------------------------------------- |
| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` |
| module_hashes | `module_hash` |
| blocks | `(block_id, block_number, chain, timestamp)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |

### Database initialization

Create a database in ClickHouse. (Optionally, skip this step and use the `default` database.)
Create a database in ClickHouse and setup the dimension tables.

Use `POST /init` on [http://localhost:3000](http://localhost:3000).

```bash
substreams-sink-clickhouse --create-db --name <DB_NAME>
> curl --location --request PUT 'http://localhost:3000/init' --header 'Authorization: Bearer <AUTH_KEY>
```

### Schema initialization

_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools (eg: `MaterializedView`)_
_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools. See this [article](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views)._

Initializes the database according to a SQL file. See [example file](#example-sql-file).

**CLI**

```
substreams-sink-clickhouse --schema-url <SCHEMA_URL>
```
Initializes the database according to a SQL or a GraphQL file. See [example schema files](#schema-examples).

**Web UI**

Upload a `.sql` file on [http://localhost:3000](http://localhost:3000). (POST request `/schema`, Content-Type: `application/octet-stream`)
Upload a schema file on [http://localhost:3000](http://localhost:3000).

_Use PUT `/schema/sql` or PUT `/schema/graphql` with `Content-Type: application/octet-stream`._

**Curl**

```bash
curl --location --request POST 'http://localhost:3000/schema' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<SQL_INSTRUCTIONS>'
> curl --location --request PUT 'http://localhost:3000/schema/sql' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<SQL_INSTRUCTIONS>'

> curl --location --request PUT 'http://localhost:3000/schema/graphql' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<GRAPHQL_ENTITY>'
```

#### Example SQL file
### Schema examples

<details>
<summary>Click to expand</summary>
<summary><b>Example SQL file</b></summary>

```sql
CREATE TABLE IF NOT EXISTS contracts (
Expand All @@ -167,6 +221,21 @@ ORDER BY (address)

</details>

<details>
<summary><b>Example GraphQL file</b></summary>

```graphql
type Contracts @entity {
id: ID!
address: String!
name: String
symbol: String
decimals: BigInt
}
```

</details>

### Sink

Serves an endpoint to receive Substreams data from [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook).
Expand Down
3 changes: 0 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export const DEFAULT_HOST = "http://localhost:8123";
export const DEFAULT_DATABASE = "default";
export const DEFAULT_USERNAME = "default";
export const DEFAULT_PASSWORD = "";
export const DEFAULT_CREATE_DATABASE = "false";
export const DEFAULT_ASYNC_INSERT = 1;
export const DEFAULT_WAIT_FOR_ASYNC_INSERT = 0;
export const DEFAULT_QUEUE_LIMIT = 10;
Expand All @@ -29,14 +28,12 @@ export const opts = program
.addOption(new Option("-p, --port <number>", "HTTP port on which to attach the sink").env("PORT").default(DEFAULT_PORT))
.addOption(new Option("-v, --verbose <boolean>", "Enable verbose logging").choices(["true", "false"]).env("VERBOSE").default(DEFAULT_VERBOSE))
.addOption(new Option("--hostname <string>", "Server listen on HTTP hostname").env("HOSTNAME").default(DEFAULT_HOSTNAME))
.addOption(new Option("-s, --schema-url <string>", "Execute SQL instructions before starting the sink").env("SCHEMA_URL").preset(DEFAULT_SCHEMA_URL))
.addOption(new Option("--public-key <string>", "Public key to validate messages").env("PUBLIC_KEY"))
.addOption(new Option("--auth-key <string>", "Auth key to validate requests").env("AUTH_KEY"))
.addOption(new Option("--host <string>", "Database HTTP hostname").env("HOST").default(DEFAULT_HOST))
.addOption(new Option("--username <string>", "Database user").env("USERNAME").default(DEFAULT_USERNAME))
.addOption(new Option("--password <string>", "Password associated with the specified username").env("PASSWORD").default(DEFAULT_PASSWORD))
.addOption(new Option("--database <string>", "The database to use inside ClickHouse").env("DATABASE").default(DEFAULT_DATABASE))
.addOption(new Option("--create-database <boolean>", "If the specified database does not exist, automatically create it").env("CREATE_DATABASE").default(DEFAULT_CREATE_DATABASE))
.addOption(new Option("--async-insert <number>", "https://clickhouse.com/docs/en/operations/settings/settings#async-insert").choices(["0", "1"]).env("ASYNC_INSERT").default(DEFAULT_ASYNC_INSERT))
.addOption(new Option("--wait-for-async-insert <boolean>", "https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert").choices(["0", "1"]).env("WAIT_FOR_INSERT").default(DEFAULT_WAIT_FOR_ASYNC_INSERT))
.addOption(new Option("--queue-limit <number>","Insert delay to each response when the pqueue exceeds this value").env("QUEUE_LIMIT").default(DEFAULT_QUEUE_LIMIT))
Expand Down
4 changes: 0 additions & 4 deletions src/schemas.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ const config = ConfigSchema.parse({
hostname: "0.0.0.0",
publicKey: "a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9",
host: "http://127.0.0.1:8123",
schemaUrl: "./schema.sql",
database: "default",
username: "default",
password: "",
createDb: "false",
queueLimit: "10",
queueConcurrency: "10",
verbose: "true",
Expand All @@ -30,11 +28,9 @@ describe("ConfigSchema", () => {
test("port", () => expect(config.port).toBe(3000));
test("queueLimit", () => expect(config.queueLimit).toBe(10));
test("verbose", () => expect(config.verbose).toBe(true));
test("schemaUrl", () => expect(config.schemaUrl).toBe("./schema.sql"));
test("database", () => expect(config.database).toBe("default"));
test("username", () => expect(config.username).toBe("default"));
test("publicKey", () => expect(config.publicKey).toBe("a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"));
test("waitForAsyncInsert", () => expect(config.waitForAsyncInsert).toBe(0));
test("asyncInsert", () => expect(config.asyncInsert).toBe(1));
test("createDatabase", () => expect(config.createDatabase).toBe(false));
});
2 changes: 0 additions & 2 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ export const ConfigSchema = z.object({
database: z.string(),
username: z.string(),
password: z.string(),
createDatabase: boolean,
asyncInsert: oneOrZero,
waitForAsyncInsert: oneOrZero,
queueLimit: positiveNumber,
queueConcurrency: positiveNumber,
schemaUrl: z.optional(z.string()),
});
export type ConfigSchema = z.infer<typeof ConfigSchema>;

Expand Down