Skip to content

Commit

Permalink
Merge pull request #42 from pinax-network/feature/include-features-in…
Browse files Browse the repository at this point in the history
…-readme

README update
  • Loading branch information
JulienR1 authored Oct 27, 2023
2 parents a02e805 + d44b215 commit 8c86d07
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 46 deletions.
2 changes: 0 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ HOST=http://127.0.0.1:8123
DATABASE=default
USERNAME=default
PASSWORD=
CREATE_DB=false

# Sink
QUEUE_LIMIT=10
QUEUE_CONCURRENCY=10
SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql)
VERBOSE=true
139 changes: 104 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,57 @@

## Features

- TO-DO: See [issues](https://github.com/pinax-network/substreams-sink-clickhouse/issues?q=is%3Aissue+is%3Aclosed)
<details>
<summary><b>Serverless data sinking</b></summary>

By using this sink with [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook), data from any substreams is available in ClickHouse easily.

</details>

<details>
<summary><b>Automatic block information</b></summary>

Data for each block is stored alongside every record. The fields and their structure can be found in the [database structure](#database-structure).

</details>

<details>
<summary><b>SQL schemas</b></summary>

A schema can be passed in to define the end table for substreams data. It will be extended as described in the [database structure](#database-structure).

They can be set according to the steps in [database initialization](#database-initialization).

</details>

<details>
<summary><b>GraphQL schemas</b></summary>

[TheGraph's GraphQL entity](https://thegraph.com/docs/en/developing/creating-a-subgraph/#defining-entities) schemas can be passed in to define the end table for substreams data. See [database initialization](#database-initialization).

They are converted to SQL following these rules before being executed. The available types are defined [here](https://thegraph.com/docs/en/developing/creating-a-subgraph/#graphql-supported-scalars).

| GraphQL data type | ClickHouse equivalent |
| ----------------- | --------------------- |
| `Bytes` | `String` |
| `String` | `String` |
| `Boolean` | `boolean` |
| `Int` | `Int32` |
| `BigInt` | `String` |
| `BigDecimal` | `String` |
| `Float` | `Float64` |
| `ID` | `String` |

</details>

<details>
<summary><b>NO schema</b></summary>

No schema is required to store data in ClickHouse. Everything can be stored in `unparsed_json` (see [database structure](#database-structure)).

The user **must** build custom [views](https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views) to transform the data according to their needs. Further details are available in [ClickHouse's documentation](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views).

</details>

## Usage

Expand All @@ -23,14 +73,12 @@ Options:
-V, --version output the version number
-p, --port <number> HTTP port on which to attach the sink (default: "3000", env: PORT)
-v, --verbose <boolean> Enable verbose logging (choices: "true", "false", default: "pretty", env: VERBOSE)
-s, --schema-url <string> Execute SQL instructions before starting the sink (env: SCHEMA_URL)
--public-key <string> Public key to validate messages (env: PUBLIC_KEY)
--auth-key <string> Auth key to validate requests (env: AUTH_KEY)
--host <string> Database HTTP hostname (default: "http://localhost:8123", env: HOST)
--database <string> The database to use inside ClickHouse (default: "default", env: DATABASE)
--username <string> Database user (default: "default", env: USERNAME)
--password <string> Password associated with the specified username (default: "", env: PASSWORD)
--create-database <boolean If the specified database does not exist, automatically create it (default: "false", env: CREATE_DATABASE)
--async-insert <number> https://clickhouse.com/docs/en/operations/settings/settings#async-insert (choices: "0", "1", default: 1, env: ASYNC_INSERT)
--wait-for-insert <boolean> https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert (choices: "0", "1", default: 0, env: WAIT_FOR_INSERT)
--queue-limit <number> Insert delay to each response when the pqueue exceeds this value (default: 10, env: QUEUE_LIMIT)
Expand All @@ -40,9 +88,10 @@ Options:

### Environment variables

These can all be set when starting the sink. See [cli structure](#cli-structure).
These can all be set when starting the sink. See [usage](#usage).

**.env**

```bash
# Authentication
PUBLIC_KEY=... # Ed25519 Public-key provided by https://github.com/pinax-network/substreams-sink-webhook
Expand All @@ -57,12 +106,10 @@ HOST=http://127.0.0.1:8123
DATABASE=default
USERNAME=default
PASSWORD=
CREATE_DB=false

# Sink
QUEUE_LIMIT=10
QUEUE_CONCURRENCY=10
SCHEMA_URL=... # generate SQL schema by providing file (ex: ./schema.sql) or URL path (ex: https://example.com/schema.sql)
VERBOSE=true
```

Expand All @@ -72,11 +119,13 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b

```mermaid
erDiagram
USER_DIMENSION }|--|{ block : " "
USER_DIMENSION }|--|{ manifest : " "
USER_DIMENSION }|--|{ blocks : " "
USER_DIMENSION }|--|{ module_hashes : " "
block }|--|{ unparsed_json : " "
manifest }|--|{ unparsed_json : " "
blocks }|--|{ unparsed_json : " "
module_hashes }|--|{ unparsed_json : " "
blocks }|--|{ final_blocks : " "
USER_DIMENSION {
user_data unknown
Expand All @@ -95,64 +144,69 @@ erDiagram
chain LowCardinality(String)
}
block {
blocks {
block_id FixedString(64)
block_number UInt32()
block_number UInt32
chain LowCardinality(String)
timestamp DateTime64(3_UTC)
final_block Bool
}
manifest {
module_hashes {
module_hash FixedString(40)
module_name String()
module_name String
chain LowCardinality(String)
type String()
type String
}
final_blocks {
block_id FixedString(64)
}
```

**Indexes**

| Table | Fields |
| -------------- | ------------------------------------------ |
| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` |
| block | `(block_id, block_number, timestamp)` |
| manifest | `module_hash` |
| Table | Fields |
| -------------- | -------------------------------------------- |
| USER_DIMENSION | `(chain, module_hash)` `(chain, block_id)` |
| module_hashes | `module_hash` |
| blocks | `(block_id, block_number, chain, timestamp)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |

### Database initialization

Create a database in ClickHouse. (Optionally, skip this step and use the `default` database.)
Create a database in ClickHouse and setup the dimension tables.

Use `POST /init` on [http://localhost:3000](http://localhost:3000).

```bash
substreams-sink-clickhouse --create-db --name <DB_NAME>
> curl --location --request PUT 'http://localhost:3000/init' --header 'Authorization: Bearer <AUTH_KEY>
```
### Schema initialization
_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools (eg: `MaterializedView`)_
_This step can be skipped. If so, the data will be stored as-is in the `unparsed_json` table. It should then be parsed by the user with ClickHouse's tools. See this [article](https://clickhouse.com/docs/en/integrations/data-formats/json#using-materialized-views)._

Initializes the database according to a SQL file. See [example file](#example-sql-file).

**CLI**

```
substreams-sink-clickhouse --schema-url <SCHEMA_URL>
```
Initializes the database according to a SQL or a GraphQL file. See [example schema files](#schema-examples).

**Web UI**

Upload a `.sql` file on [http://localhost:3000](http://localhost:3000). (POST request `/schema`, Content-Type: `application/octet-stream`)
Upload a schema file on [http://localhost:3000](http://localhost:3000).

_Use PUT `/schema/sql` or PUT `/schema/graphql` with `Content-Type: application/octet-stream`._

**Curl**

```bash
curl --location --request POST 'http://localhost:3000/schema' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<SQL_INSTRUCTIONS>'
> curl --location --request PUT 'http://localhost:3000/schema/sql' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<SQL_INSTRUCTIONS>'
> curl --location --request PUT 'http://localhost:3000/schema/graphql' --header 'Authorization: Bearer <AUTH_KEY>' --header 'Content-Type: application/json' --data-raw '<GRAPHQL_ENTITY>'
```

#### Example SQL file
### Schema examples

<details>
<summary>Click to expand</summary>
<summary><b>Example SQL file</b></summary>

```sql
CREATE TABLE IF NOT EXISTS contracts (
Expand All @@ -167,6 +221,21 @@ ORDER BY (address)

</details>

<details>
<summary><b>Example GraphQL file</b></summary>

```graphql
type Contracts @entity {
id: ID!
address: String!
name: String
symbol: String
decimals: BigInt
}
```

</details>

### Sink

Serves an endpoint to receive Substreams data from [substreams-sink-webhook](https://github.com/pinax-network/substreams-sink-webhook).
Expand Down
3 changes: 0 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export const DEFAULT_HOST = "http://localhost:8123";
export const DEFAULT_DATABASE = "default";
export const DEFAULT_USERNAME = "default";
export const DEFAULT_PASSWORD = "";
export const DEFAULT_CREATE_DATABASE = "false";
export const DEFAULT_ASYNC_INSERT = 1;
export const DEFAULT_WAIT_FOR_ASYNC_INSERT = 0;
export const DEFAULT_QUEUE_LIMIT = 10;
Expand All @@ -28,14 +27,12 @@ export const opts = program
.addOption(new Option("-p, --port <number>", "HTTP port on which to attach the sink").env("PORT").default(DEFAULT_PORT))
.addOption(new Option("-v, --verbose <boolean>", "Enable verbose logging").choices(["true", "false"]).env("VERBOSE").default(DEFAULT_VERBOSE))
.addOption(new Option("--hostname <string>", "Server listen on HTTP hostname").env("HOSTNAME").default(DEFAULT_HOSTNAME))
.addOption(new Option("-s, --schema-url <string>", "Execute SQL instructions before starting the sink").env("SCHEMA_URL").preset(DEFAULT_SCHEMA_URL))
.addOption(new Option("--public-key <string>", "Public key to validate messages").env("PUBLIC_KEY"))
.addOption(new Option("--auth-key <string>", "Auth key to validate requests").env("AUTH_KEY"))
.addOption(new Option("--host <string>", "Database HTTP hostname").env("HOST").default(DEFAULT_HOST))
.addOption(new Option("--username <string>", "Database user").env("USERNAME").default(DEFAULT_USERNAME))
.addOption(new Option("--password <string>", "Password associated with the specified username").env("PASSWORD").default(DEFAULT_PASSWORD))
.addOption(new Option("--database <string>", "The database to use inside ClickHouse").env("DATABASE").default(DEFAULT_DATABASE))
.addOption(new Option("--create-database <boolean>", "If the specified database does not exist, automatically create it").env("CREATE_DATABASE").default(DEFAULT_CREATE_DATABASE))
.addOption(new Option("--async-insert <number>", "https://clickhouse.com/docs/en/operations/settings/settings#async-insert").choices(["0", "1"]).env("ASYNC_INSERT").default(DEFAULT_ASYNC_INSERT))
.addOption(new Option("--wait-for-async-insert <boolean>", "https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert").choices(["0", "1"]).env("WAIT_FOR_INSERT").default(DEFAULT_WAIT_FOR_ASYNC_INSERT))
.addOption(new Option("--queue-limit <number>","Insert delay to each response when the pqueue exceeds this value").env("QUEUE_LIMIT").default(DEFAULT_QUEUE_LIMIT))
Expand Down
4 changes: 0 additions & 4 deletions src/schemas.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ const config = ConfigSchema.parse({
hostname: "0.0.0.0",
publicKey: "a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9",
host: "http://127.0.0.1:8123",
schemaUrl: "./schema.sql",
database: "default",
username: "default",
password: "",
createDb: "false",
queueLimit: "10",
queueConcurrency: "10",
verbose: "true",
Expand All @@ -30,11 +28,9 @@ describe("ConfigSchema", () => {
test("port", () => expect(config.port).toBe(3000));
test("queueLimit", () => expect(config.queueLimit).toBe(10));
test("verbose", () => expect(config.verbose).toBe(true));
test("schemaUrl", () => expect(config.schemaUrl).toBe("./schema.sql"));
test("database", () => expect(config.database).toBe("default"));
test("username", () => expect(config.username).toBe("default"));
test("publicKey", () => expect(config.publicKey).toBe("a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"));
test("waitForAsyncInsert", () => expect(config.waitForAsyncInsert).toBe(0));
test("asyncInsert", () => expect(config.asyncInsert).toBe(1));
test("createDatabase", () => expect(config.createDatabase).toBe(false));
});
2 changes: 0 additions & 2 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ export const ConfigSchema = z.object({
database: z.string(),
username: z.string(),
password: z.string(),
createDatabase: boolean,
asyncInsert: oneOrZero,
waitForAsyncInsert: oneOrZero,
queueLimit: positiveNumber,
queueConcurrency: positiveNumber,
schemaUrl: z.optional(z.string()),
});
export type ConfigSchema = z.infer<typeof ConfigSchema>;

Expand Down

0 comments on commit 8c86d07

Please sign in to comment.