Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve writeRow logic #20

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ SUBSTREAMS_API_KEY=<your-api-key>
MANIFEST=https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg
MODULE_NAME=graph_out
SUBSTREAMS_ENDPOINT=eth.substreams.pinax.network:443
SCHEMA=schema.example.sql
SCHEMA=schema.example.sql
DELIMITER=","
105 changes: 92 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ CREATE TABLE block_meta

**Reserved field names** to be used to expand the schema:

- `id` (TEXT NOT NULL PRIMARY KEY)
- `block_num` (BIGINT)
- `block_id` (TEXT)
- `cursor` (TEXT)
- `timestamp` (TIMESTAMP)
- `seconds` (BIGINT)
- `operation` (TEXT)
- `id` (String)
- `block_number` (UInt64)
- `block`
- `block_num`
- `block_id` (String)
- `cursor` (String)
- `timestamp` (DateTime)
- `seconds` (Int64)
- `nanos` (Int32)
- `nanoseconds`
- `milliseconds` (Int64)
- `millis`
- `operation` (String)

### Get Substreams API Key

Expand All @@ -45,6 +51,7 @@ SUBSTREAMS_ENDPOINT=eth.substreams.pinax.network:443
SCHEMA=schema.example.sql
FINAL_BLOCKS_ONLY=true
START_BLOCK=2
DELIMITER=","
```
**CLI** with `.env` file
```bash
Expand Down Expand Up @@ -85,8 +92,6 @@ eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out.
```bash
$ substreams-sink-csv --help

Usage: substreams-sink-csv run [options]

Substreams Sink CSV

Options:
Expand All @@ -100,13 +105,15 @@ Options:
--substreams-api-key <string> API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
--delay-before-start <int> Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
--cursor <string> Cursor to stream from. Leave blank for no cursor
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY)
--inactivity-seconds <int> If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
--verbose <boolean> Enable verbose logging (default: "false", env: VERBOSE)
--plaintext <boolean> Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT)
--verbose <boolean> Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE)
--filename <string> CSV filename (default: '<endpoint>-<module_hash>-<module_name>.csv') (env: FILENAME)
--schema <string> SQL Table Schema for CSV (default: "schema.sql", env: SCHEMA)
--schema <string> SQL table schema for CSV (default: "schema.sql", env: SCHEMA)
--delimiter <string> CSV delimiter (default: ",", env: DELIMITER)
-h, --help display help for command
```

Expand Down Expand Up @@ -141,3 +148,75 @@ module.exports = {
```bash
$ pm2 start
```

## Loading CSV Data into ClickHouse

[**Quick Install**](https://clickhouse.com/docs/en/install)
```bash
$ curl https://clickhouse.com/ | sh
```

**Start ClickHouse**
```bash
$ clickhouse server
```

**Connect to ClickHouse**
```bash
$ clickhouse client
```

**Create a ClickHouse table**

> Before importing data, let’s create a table with a relevant structure:
```sql
CREATE TABLE block_meta
(
block_num UInt64,
timestamp DateTime,
id String,
hash String,
parent_hash String
)
ENGINE = ReplacingMergeTree()
ORDER BY block_num;
```

[**Load CSV data into ClickHouse**](https://clickhouse.com/docs/en/integrations/data-formats/csv-tsv)

> To import data from the CSV file to the `block_meta` table, we can pipe our file directly to the clickhouse-client:

```bash
$ clickhouse-client --query="INSERT INTO block_meta FORMAT CSV" < eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out-block_meta.csv
```

> Note that we use `FORMAT CSV` to let ClickHouse know we’re ingesting CSV formatted data. Alternatively, we can load data from a local file using the `FROM INFILE` clause:

```sql
INSERT INTO block_meta
FROM INFILE 'eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out-block_meta.csv'
FORMAT CSV
```

**Query the ClickHouse table**

```sql
SELECT * FROM block_meta LIMIT 10;
```

```yml
┌─block_num─┬───────────timestamp─┬─id────────────────┬─hash─────────────────────────────────────────┬─parent_hash──────────────────────────────────┐
│ 2 │ 2015-07-30 15:26:57 │ day:last:20150730 │ tJWh1+ZmMVKuknCNpIQzN7lYFGAVooAvQZOkEARGmMk= │ iOltRTe+pNnAXRJUmQezJWHTvzH0Wq5zTNwRnxNAbLY= │
│ 3 │ 2015-07-30 15:27:28 │ day:last:20150730 │ PWEiZgzIJDdvEe6EL4Ot3DUl4t1nVrm88K/6aqiM90E= │ tJWh1+ZmMVKuknCNpIQzN7lYFGAVooAvQZOkEARGmMk= │
│ 4 │ 2015-07-30 15:27:57 │ day:last:20150730 │ I631o74PUjWzaUG8sptiUEJ47Fuc36J3uZK6Sno806I= │ PWEiZgzIJDdvEe6EL4Ot3DUl4t1nVrm88K/6aqiM90E= │
│ 5 │ 2015-07-30 15:28:03 │ day:last:20150730 │ 83xjLTYeCpPwi6KbGixwjZyqPuGdHujSoCYSv/5J8Kk= │ I631o74PUjWzaUG8sptiUEJ47Fuc36J3uZK6Sno806I= │
│ 6 │ 2015-07-30 15:28:27 │ day:last:20150730 │ HxrtjjaUoGdJbCSOYYec2pmwcJod+6zQtpN1DfBrMm4= │ 83xjLTYeCpPwi6KbGixwjZyqPuGdHujSoCYSv/5J8Kk= │
│ 7 │ 2015-07-30 15:28:30 │ day:last:20150730 │ 4MfAtG4Ra4dDVNzm9kuFgb0jkYawPzCpeOPcOGVvcjo= │ HxrtjjaUoGdJbCSOYYec2pmwcJod+6zQtpN1DfBrMm4= │
│ 8 │ 2015-07-30 15:28:32 │ day:last:20150730 │ LOlDQt8Ya6tBZcJoxDq5gtNgyUdPQp/sVWWt/F0fJYs= │ 4MfAtG4Ra4dDVNzm9kuFgb0jkYawPzCpeOPcOGVvcjo= │
│ 9 │ 2015-07-30 15:28:35 │ day:last:20150730 │ mX5Hv0ysUJxid1PAY4WshmZB7G+INzT/eURBEADcV24= │ LOlDQt8Ya6tBZcJoxDq5gtNgyUdPQp/sVWWt/F0fJYs= │
│ 10 │ 2015-07-30 15:28:48 │ day:last:20150730 │ T/SjiyeKtJ93OdOk7U4ScUOGqf33IZLy6PfaeCLxC00= │ mX5Hv0ysUJxid1PAY4WshmZB7G+INzT/eURBEADcV24= │
│ 11 │ 2015-07-30 15:28:56 │ day:last:20150730 │ P151bD78uTCZNht93Q2r/qpZJDlDfByDbkQ8y4HpMkI= │ T/SjiyeKtJ93OdOk7U4ScUOGqf33IZLy6PfaeCLxC00= │
└───────────┴─────────────────────┴───────────────────┴──────────────────────────────────────────────┴──────────────────────────────────────────────┘

10 rows in set. Elapsed: 0.001 sec. Processed 8.19 thousand rows, 1.18 MB (5.51 million rows/s., 793.31 MB/s.)
```
4 changes: 3 additions & 1 deletion bin/cli.mts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { version } from "../version.js";
export interface CSVRunOptions extends commander.RunOptions {
schema: string;
filename?: string;
delimiter: string;
}

const name = "substreams-sink-csv";
Expand All @@ -18,7 +19,8 @@ const pkg = {name, version, description};
const program = commander.program(pkg);
const command = commander.addRunOptions(program, {metrics: false, http: false});
command.addOption(new Option("--filename <string>", "CSV filename (default: '<endpoint>-<module_hash>-<module_name>.csv')").env("FILENAME"));
command.addOption(new Option("--schema <string>", "SQL Table Schema for CSV").default("schema.sql").env("SCHEMA"));
command.addOption(new Option("--schema <string>", "SQL table schema for CSV").default("schema.sql").env("SCHEMA"));
command.addOption(new Option("--delimiter <string>", "CSV delimiter").default(",").env("DELIMITER"));
command.action(action);

program.parse();
30 changes: 11 additions & 19 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { getModuleHash, isRemotePath } from "./src/getModuleHash.js";
import { parseFilename } from "./src/parseFilename.js";
import { parseClock } from "./src/parseClock.js";
import { parseSchema } from "./src/parseSchema.js";
import { writeRow } from "./src/writeRow.js";
import { applyReservedFields } from "./src/applyReservedFields.js";

export async function action(options: CSVRunOptions ) {
console.log(`[substreams-sink-csv] v${version}`);
Expand Down Expand Up @@ -58,7 +60,7 @@ export async function action(options: CSVRunOptions ) {
// log stats
let rows = 0;
let blocks = 0;
let last_block_num = 0;
let last_block_number = 0;
let last_timestamp = "";
let totalBytesRead = 0;
let totalBytesWritten = 0;
Expand Down Expand Up @@ -90,14 +92,14 @@ export async function action(options: CSVRunOptions ) {
emitter.on("clock", (clock) => {
// write block to file
// used to track how many blocks have been processed per module
const { block_num, block_id, seconds, timestamp } = parseClock(clock);
clockWriter.write([block_num, block_id, seconds, timestamp].join(",") + '\n');
const { block_number, block_id, seconds, timestamp } = parseClock(clock);
writeRow(clockWriter, [block_number, block_id, seconds, timestamp], options)
});

// Stream Messages
emitter.on("anyMessage", async (data, cursor, clock) => {
const { block_num, block_id, timestamp, seconds } = parseClock(clock);
last_block_num = block_num;
const { block_number, timestamp, seconds } = parseClock(clock);
last_block_number = block_number;
last_timestamp = timestamp;
last_seconds = seconds;

Expand All @@ -107,26 +109,16 @@ export async function action(options: CSVRunOptions ) {
const table = tables.get(entityChange.entity);
if ( !writer || !table ) throw new Error(`Table not found: ${entityChange.entity}`);
const values = getValuesInEntityChange(entityChange);

// **Reserved field names** to be used to expand the schema
values["id"] = entityChange.id;
values["cursor"] = cursor;
values["operation"] = entityChange.operation;
values["block_id"] = block_id;
values["block_num"] = block_num;
values["timestamp"] = timestamp;
values["seconds"] = seconds;
applyReservedFields(values, entityChange, cursor, clock);

// order values based on table
const data = table.map((column) => {
const value = values[column];
if ( value === undefined ) return null;
if ( typeof value == "string" && value.includes(",") ) return `"${value}"`; // escape commas
const value = values[column] as unknown;
return value;
});

// save CSV row
writer.write(data.join(",") + "\n");
writeRow(writer, data, options);
rows++;
};

Expand All @@ -140,7 +132,7 @@ export async function action(options: CSVRunOptions ) {
if ( last_update != now) {
last_update = now;
const blocksPerSecond = Math.floor(blocks / (last_update - start));
logUpdate(JSON.stringify({last_block_num, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs}));
logUpdate(JSON.stringify({last_block_number, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs}));
}
}

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.2.9",
"version": "0.2.10",
"name": "substreams-sink-csv",
"description": "Substreams Sink CSV",
"type": "module",
Expand Down
29 changes: 29 additions & 0 deletions src/applyReservedFields.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Clock } from "@substreams/core/proto"
import { EntityChange } from "@substreams/sink-entity-changes/zod";
import { parseClock, parseTimestamp } from "./parseClock.js";
import { Timestamp } from "@bufbuild/protobuf";

export function applyReservedFields( values: Record<string, unknown>, entityChange: EntityChange, cursor: string, clock: Clock ) {
const { block_number, block_id, timestamp, seconds, milliseconds, nanos } = parseClock(clock);

// **Reserved field names** to be used to expand the schema
if ( !values["id"] ) values["id"] = entityChange.id;
if ( !values["operation"] ) values["operation"] = entityChange.operation;
if ( !values["cursor"] ) values["cursor"] = cursor;
if ( !values["block"] ) values["block"] = block_number;
if ( !values["block_num"] ) values["block_num"] = block_number;
if ( !values["block_number"] ) values["block_number"] = block_number;
if ( !values["block_id"] ) values["block_id"] = block_id;
if ( !values["seconds"] ) values["seconds"] = seconds;
if ( !values["milliseconds"] ) values["milliseconds"] = milliseconds;
if ( !values["millis"] ) values["millis"] = milliseconds;
if ( !values["nanos"] ) values["nanos"] = nanos;
if ( !values["nanoseconds"] ) values["nanoseconds"] = nanos;
if ( !values["timestamp"] ) values["timestamp"] = timestamp;

// exception parsing timestamp
if ( values["timestamp"] ) values["timestamp"] = parseTimestamp(Timestamp.fromDate(new Date(values["timestamp"] as string)));
else values["timestamp"] = timestamp;

return values;
}
8 changes: 8 additions & 0 deletions src/parseClock.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { expect, test } from "bun:test";
import { parseTimestamp } from "./parseClock.js";
import { Timestamp } from "@bufbuild/protobuf";

test("parseTimestamp", () => {
// expect(parseTimestamp(timestamp)).toBe("2015-07-30T15:26:57.000Z");
expect(parseTimestamp(Timestamp.fromDate(new Date(1438270017000)))).toBe("2015-07-30 15:26:57");
})
27 changes: 19 additions & 8 deletions src/parseClock.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import { Clock } from "@substreams/core/proto";
import { Timestamp } from "@bufbuild/protobuf";

export function parseClock(clock: Clock) {
if ( !clock.timestamp ) throw new Error("Clock has no timestamp");
return {
block_num: Number(clock.number),
block_id: clock.id,
seconds: Number(clock.timestamp?.seconds),
timestamp: clock.timestamp?.toDate().toISOString(),
}
}
if ( !clock.timestamp ) throw new Error("Clock has no timestamp");
const seconds = Number(clock.timestamp?.seconds);
const nanos = Number(clock.timestamp?.nanos);
const milliseconds = seconds * 1000 + nanos / 1000000;

return {
block_number: Number(clock.number),
block_id: clock.id,
seconds,
milliseconds,
nanos,
timestamp: parseTimestamp(clock.timestamp)
}
}

export function parseTimestamp(timestamp: Timestamp) {
return timestamp.toDate().toISOString().replace("T", " ").split(".")[0]
}
14 changes: 14 additions & 0 deletions src/writeRow.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { expect, test } from "bun:test";
import { formatValue } from "./writeRow.js";

test("formatValue", () => {
const options = {delimiter: ","};
// expect(formatValue("a", options)).toBe("a");
expect(formatValue("'a'", options)).toBe("'a'");
expect(formatValue("foo bar", options)).toBe("foo bar");
expect(formatValue("foo \" bar", options)).toBe("foo \"\" bar");
expect(formatValue(undefined, options)).toBe("");
expect(formatValue(null, options)).toBe("");
expect(formatValue("a,b", options)).toBe("\"a,b\"");
expect(formatValue("a,\"b", options)).toBe("\"a,\"\"b\"");
})
20 changes: 20 additions & 0 deletions src/writeRow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import fs from "fs";

interface WriteRowOptions {
delimiter: string;
}

export function writeRow(writer: fs.WriteStream, columns: any[], options: WriteRowOptions): void {
columns = columns.map(value => formatValue(value, options));
writer.write(columns.join(options.delimiter) + '\n');
}

export function formatValue(value: string|undefined|null, options: WriteRowOptions): string {
if (value === undefined || value === null) return "";

if (typeof value == "string") {
value = value.replace(/"/g, "\"\"")
if ( value.includes(options.delimiter) ) value = `"${value}"`; // escape commas
}
return value;
}
2 changes: 1 addition & 1 deletion version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const version = "0.2.9";
export const version = "0.2.10";
Loading