Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update and delete entity changes #87

Merged
merged 9 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@

## Features

<details>
<summary><b><a href="https://crates.io/crates/substreams-entity-change/">Entity changes</a> support</b></summary>

Support for these entity change operations:

- `OPERATION_CREATE`: The received entity changes are directly inserted into ClickHouse according to a provided [schema](#schema-initialization).

- `OPERATION_UPDATE`: By default, updates are treated as new items. This allows to build an history of every transactions. If required, previous records can be replaced by specifying the engine as `ReplacingMergeTree`. See this [article](https://clickhouse.com/docs/en/guides/developer/deduplication#using-replacingmergetree-for-upserts).

- `OPERATION_DELETE`: Entity changes are not actually deleted from the database. Again, this allows to build an history of every transactions. The deleted fields are inserted into `deleted_entity_changes`. This table can then be used to filter out deleted data if required.

</details>

<details>
<summary><b>Serverless data sinking</b></summary>

Expand Down Expand Up @@ -133,7 +146,7 @@ Options:
--allow-unparsed <boolean> Enable storage in 'unparsed_json' table (default: false, env: ALLOW_UNPARSED)
--transaction-size <number> Number of insert statements in a SQLite transaction (default: 50, env: TRANSACTION_SIZE)
--resume <boolean> Save the cached data from the previous process into ClickHouse (default: true, env: RESUME)
--buffer <string> SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.sqlite, env: BUFFER)
--buffer <string> SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.db, env: BUFFER)
-h, --help display help for command
```

Expand Down Expand Up @@ -171,14 +184,18 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b
```mermaid
erDiagram
USER_DIMENSION }|--|{ blocks : " "
USER_DIMENSION }|--|{ module_hashes : " "
module_hashes }|--|{ USER_DIMENSION : " "
USER_DIMENSION }|--|{ cursors : " "

blocks }|--|{ final_blocks : " "
deleted_entity_changes }|--|{ blocks : " "
module_hashes }|--|{ deleted_entity_changes : " "
deleted_entity_changes }|--|{ cursors : " "

blocks }|--|{ unparsed_json : " "
unparsed_json }|--|{ blocks : " "
module_hashes }|--|{ unparsed_json : " "
cursors }|--|{ unparsed_json : " "
unparsed_json }|--|{ cursors : " "

blocks }|--|{ final_blocks : " "

USER_DIMENSION {
user_data unknown
Expand All @@ -191,6 +208,17 @@ erDiagram
cursor String
}

deleted_entity_changes {
source LowCardinality(String)
id String
chain LowCardinality(String)
block_id FixedString(64)
block_number UInt32
module_hash FixedString(40)
timestamp DateTime(3_UTC)
cursor String
}

unparsed_json {
raw_data String
source LowCardinality(String)
Expand Down Expand Up @@ -232,13 +260,14 @@ erDiagram

**Indexes**

| Table | Fields |
| ------------- | -------------------------------------------- |
| blocks | `(block_id, block_number, chain, timestamp)` |
| module_hashes | `module_hash` |
| cursors | `(cursor, module_hash, block_id)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |
| Table | Fields |
| ---------------------- | ---------------------------------------------------- |
| blocks | `(block_id, block_number, chain, timestamp)` |
| deleted_entity_changes | `(source, block_id, block_number, chain, timestamp)` |
| module_hashes | `module_hash` |
| cursors | `(cursor, module_hash, block_id)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |

### Database initialization

Expand Down
71 changes: 16 additions & 55 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ export function saveKnownEntityChanges() {
if (await store.existsTable(table)) {
await client.insert({ table, values, format: "JSONEachRow" });
} else {
logger.info(
`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`
);
logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
}
}
}
Expand All @@ -111,22 +109,7 @@ function batchSizeLimitReached() {

function handleNoEntityChange(metadata: { clock: Clock; manifest: Manifest; cursor: string }) {
const { clock, manifest, cursor } = metadata;

sqliteQueue.add(() =>
sqlite.insert(
"",
"",
manifest.chain,
clock.id,
clock.number,
manifest.finalBlockOnly,
manifest.moduleHash,
manifest.moduleName,
manifest.type,
Number(new Date(clock.timestamp)),
cursor
)
);
sqliteQueue.add(() => sqlite.insert("", "", clock, manifest, cursor));
}

async function handleEntityChange(
Expand All @@ -140,6 +123,7 @@ async function handleEntityChange(
const jsonData = JSON.stringify(values);
const clock = JSON.stringify(metadata.clock);
const manifest = JSON.stringify(metadata.manifest);
const environment = { chain: metadata.manifest.chain, module_hash: metadata.manifest.moduleHash };

if (!tableExists) {
if (!config.allowUnparsed) {
Expand All @@ -155,13 +139,23 @@ async function handleEntityChange(

switch (change.operation) {
case "OPERATION_CREATE":
prometheus.entity_changes_inserted.inc(environment);
return insertEntityChange(table, values, { ...metadata, id: change.id });

// Updates are inserted as new rows in ClickHouse. This allows for the full history.
// If the user wants to override old data, they can specify it in their schema
// by using a ReplacingMergeTree.
case "OPERATION_UPDATE":
return updateEntityChange();
prometheus.entity_changes_updated.inc(environment);
return insertEntityChange(table, values, { ...metadata, id: change.id });

// Deleted entity changes are not actually removed from the database.
// They are stored in the 'deleted_entity_changes' table with their timestamp.
// Again, this allows to keep the full history while also providing the required information
// to correctly filter out unwanted data if necessary.
case "OPERATION_DELETE":
return deleteEntityChange();
prometheus.entity_changes_deleted.inc(environment);
return insertEntityChange("deleted_entity_changes", { source: table }, { ...metadata, id: change.id });

default:
prometheus.entity_changes_unsupported.inc();
Expand All @@ -184,39 +178,6 @@ function insertEntityChange(
values["cursor"] = metadata.cursor; // Block cursor for current substreams

sqliteQueue.add(() =>
sqlite.insert(
JSON.stringify(values),
table,
metadata.manifest.chain,
metadata.clock.id,
metadata.clock.number,
metadata.manifest.finalBlockOnly,
metadata.manifest.moduleHash,
metadata.manifest.moduleName,
metadata.manifest.type,
Number(new Date(metadata.clock.timestamp)),
metadata.cursor
)
sqlite.insert(JSON.stringify(values), table, metadata.clock, metadata.manifest, metadata.cursor)
);

prometheus.entity_changes_inserted.inc({
chain: metadata.manifest.chain,
module_hash: metadata.manifest.moduleHash,
});
}

// TODO: implement function
function updateEntityChange(): Promise<void> {
prometheus.entity_changes_updated.inc();
return Promise.resolve();

// return client.update();
}

// TODO: implement function
function deleteEntityChange(): Promise<void> {
prometheus.entity_changes_deleted.inc();
return Promise.resolve();

// return client.delete({ values, table: change.entity });
}
3 changes: 2 additions & 1 deletion src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { logger } from "../logger.js";
import { readOnlyClient } from "./createClient.js";

const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks", "unparsed_json", "deleted_entity_changes"];

class ClickhouseStore {
public paused = false;

Expand All @@ -23,7 +25,6 @@ class ClickhouseStore {

public get publicTables() {
if (!this.publicTablesPromise) {
const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks"];
this.publicTablesPromise = readOnlyClient
.query({ query: "SHOW TABLES", format: "JSONEachRow" })
.then((response) => response.json<Array<{ name: string }>>())
Expand Down
13 changes: 13 additions & 0 deletions src/clickhouse/tables/deleted_entity_changes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS deleted_entity_changes (
id String,
chain LowCardinality(String),
source LowCardinality(String),
block_id FixedString(64),
block_number UInt32,
module_hash FixedString(40),
timestamp DateTime64(3, 'UTC'),
cursor String,
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (source, block_id)
ORDER BY (source, block_id, block_number, chain, timestamp);
3 changes: 3 additions & 0 deletions src/clickhouse/tables/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import blocks_sql from "./blocks.sql";
import cursors_sql from "./cursors.sql";
import deleted_entity_changes_sql from "./deleted_entity_changes.sql";
import final_blocks_sql from "./final_blocks.sql";
import module_hashes_sql from "./module_hashes.sql";
import unparsed_json_sql from "./unparsed_json.sql";
Expand All @@ -9,11 +10,13 @@ export const final_blocks = await Bun.file(final_blocks_sql).text();
export const module_hashes = await Bun.file(module_hashes_sql).text();
export const unparsed_json = await Bun.file(unparsed_json_sql).text();
export const cursors = await Bun.file(cursors_sql).text();
export const deleted_entity_changes = await Bun.file(deleted_entity_changes_sql).text();

export default [
["blocks", blocks],
["final_blocks", final_blocks],
["module_hashes", module_hashes],
["unparsed_json", unparsed_json],
["cursors", cursors],
["deleted_entity_changes", deleted_entity_changes],
];
6 changes: 3 additions & 3 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ export const request_errors = registerCounter("request_errors", "Total failed re
export const sink_requests = registerCounter("sink_requests", "Total sink requests", ["chain", "module_hash"])!;

export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes", ["chain", "module_hash"])!;
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes")!;
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes")!;
export const entity_changes_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")!;
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes", ["chain", "module_hash"])!;
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes", ["chain", "module_hash"])!;
export const entity_changes_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes", ["chain", "module_hash"])!;
36 changes: 26 additions & 10 deletions src/sqlite/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { file } from "bun";
import Database, { Statement } from "bun:sqlite";
import { config } from "../config.js";
import { Ok, Result, UnknownErr } from "../result.js";
import { Clock, Manifest } from "../schemas.js";
import tableSQL from "./table.sql";

const selectSQL = {
Expand Down Expand Up @@ -65,11 +66,26 @@ class SQLite {
this.db.run("BEGIN TRANSACTION;");
}

public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string) {
this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor);
public insert(entityChanges: string, source: string, clock: Clock, manifest: Manifest, cursor: string) {
const { chain, finalBlockOnly, moduleHash, moduleName, type } = manifest;
const { id: blockId, number: blockNumber, timestamp: timestampStr } = clock;

const isFinal = finalBlockOnly ? 1 : 0;
const timestamp = Number(new Date(timestampStr));

const args = [source, chain, blockId, blockNumber, isFinal, moduleHash, moduleName, type, timestamp, cursor];
this.insertStatement.run(this.batchNumber, entityChanges, ...args);
}

public async commitBuffer(onData: (blocks: unknown[], cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record<string, unknown[]>) => Promise<void>): Promise<Result> {
public async commitBuffer(
onData: (
blocks: unknown[],
cursors: unknown[],
finalBlocks: unknown[],
moduleHashes: unknown[],
entityChanges: Record<string, unknown[]>
) => Promise<void>
): Promise<Result> {
try {
this.batchNumber++;

Expand All @@ -82,7 +98,9 @@ class SQLite {
const sources = this.selectSourcesStatement.all(this.batchNumber);
for (const { source } of sources) {
if (source.length > 0) {
entityChanges[source] = this.selecEntityChangesStatement.all(this.batchNumber, source).map((response) => JSON.parse(response.entity_changes));
entityChanges[source] = this.selecEntityChangesStatement
.all(this.batchNumber, source)
.map((response) => JSON.parse(response.entity_changes));
}
}

Expand All @@ -97,16 +115,14 @@ class SQLite {

private get initialBatchNumber() {
try {
const response = this.db
.query<{ batch_number: number }, any>(
`SELECT MAX(batch_number) AS batch_number
const sql = `SELECT MAX(batch_number) AS batch_number
FROM (
SELECT batch_number FROM data_buffer
UNION ALL
SELECT 0 AS batch_number
)`
)
.get();
)`;

const response = this.db.query<{ batch_number: number }, any>(sql).get();
return response!.batch_number + 1;
} catch {
return 0;
Expand Down