Skip to content

Commit

Permalink
Merge pull request #20 from pinax-network/fix/write-row
Browse files Browse the repository at this point in the history
improve writeRow logic
  • Loading branch information
DenisCarriere authored Mar 7, 2024
2 parents d61662a + fda328f commit 1b8f7cb
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ SUBSTREAMS_API_KEY=<your-api-key>
MANIFEST=https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg
MODULE_NAME=graph_out
SUBSTREAMS_ENDPOINT=eth.substreams.pinax.network:443
SCHEMA=schema.example.sql
SCHEMA=schema.example.sql
DELIMITER=","
105 changes: 92 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ CREATE TABLE block_meta

**Reserved field names** to be used to expand the schema:

- `id` (TEXT NOT NULL PRIMARY KEY)
- `block_num` (BIGINT)
- `block_id` (TEXT)
- `cursor` (TEXT)
- `timestamp` (TIMESTAMP)
- `seconds` (BIGINT)
- `operation` (TEXT)
- `id` (String)
- `block_number` (UInt64)
- `block`
- `block_num`
- `block_id` (String)
- `cursor` (String)
- `timestamp` (DateTime)
- `seconds` (Int64)
- `nanos` (Int32)
- `nanoseconds`
- `milliseconds` (Int64)
- `millis`
- `operation` (String)

### Get Substreams API Key

Expand All @@ -45,6 +51,7 @@ SUBSTREAMS_ENDPOINT=eth.substreams.pinax.network:443
SCHEMA=schema.example.sql
FINAL_BLOCKS_ONLY=true
START_BLOCK=2
DELIMITER=","
```
**CLI** with `.env` file
```bash
Expand Down Expand Up @@ -85,8 +92,6 @@ eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out.
```bash
$ substreams-sink-csv --help

Usage: substreams-sink-csv run [options]

Substreams Sink CSV

Options:
Expand All @@ -100,13 +105,15 @@ Options:
--substreams-api-key <string> API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
--delay-before-start <int> Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
--cursor <string> Cursor to stream from. Leave blank for no cursor
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY)
--inactivity-seconds <int> If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
--verbose <boolean> Enable verbose logging (default: "false", env: VERBOSE)
--plaintext <boolean> Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT)
--verbose <boolean> Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE)
--filename <string> CSV filename (default: '<endpoint>-<module_hash>-<module_name>.csv') (env: FILENAME)
--schema <string> SQL Table Schema for CSV (default: "schema.sql", env: SCHEMA)
--schema <string> SQL table schema for CSV (default: "schema.sql", env: SCHEMA)
--delimiter <string> CSV delimiter (default: ",", env: DELIMITER)
-h, --help display help for command
```
Expand Down Expand Up @@ -141,3 +148,75 @@ module.exports = {
```bash
$ pm2 start
```
## Loading CSV Data into ClickHouse
[**Quick Install**](https://clickhouse.com/docs/en/install)
```bash
$ curl https://clickhouse.com/ | sh
```
**Start ClickHouse**
```bash
$ clickhouse server
```
**Connect to ClickHouse**
```bash
$ clickhouse client
```
**Create a ClickHouse table**
> Before importing data, let’s create a table with a relevant structure:
```sql
CREATE TABLE block_meta
(
block_num UInt64,
timestamp DateTime,
id String,
hash String,
parent_hash String
)
ENGINE = ReplacingMergeTree()
ORDER BY block_num;
```
[**Load CSV data into ClickHouse**](https://clickhouse.com/docs/en/integrations/data-formats/csv-tsv)
> To import data from the CSV file to the `block_meta` table, we can pipe our file directly to the clickhouse-client:
```bash
$ clickhouse-client --query="INSERT INTO block_meta FORMAT CSV" < eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out-block_meta.csv
```
> Note that we use `FORMAT CSV` to let ClickHouse know we’re ingesting CSV formatted data. Alternatively, we can load data from a local file using the `FROM INFILE` clause:
```sql
INSERT INTO block_meta
FROM INFILE 'eth.substreams.pinax.network-3b180e1d2390afef1f22651581304e04245ba001-graph_out-block_meta.csv'
FORMAT CSV
```
**Query the ClickHouse table**
```sql
SELECT * FROM block_meta LIMIT 10;
```
```yml
┌─block_num─┬───────────timestamp─┬─id────────────────┬─hash─────────────────────────────────────────┬─parent_hash──────────────────────────────────┐
│ 2 │ 2015-07-30 15:26:57 │ day:last:20150730 │ tJWh1+ZmMVKuknCNpIQzN7lYFGAVooAvQZOkEARGmMk= │ iOltRTe+pNnAXRJUmQezJWHTvzH0Wq5zTNwRnxNAbLY= │
│ 3 │ 2015-07-30 15:27:28 │ day:last:20150730 │ PWEiZgzIJDdvEe6EL4Ot3DUl4t1nVrm88K/6aqiM90E= │ tJWh1+ZmMVKuknCNpIQzN7lYFGAVooAvQZOkEARGmMk= │
│ 4 │ 2015-07-30 15:27:57 │ day:last:20150730 │ I631o74PUjWzaUG8sptiUEJ47Fuc36J3uZK6Sno806I= │ PWEiZgzIJDdvEe6EL4Ot3DUl4t1nVrm88K/6aqiM90E= │
│ 5 │ 2015-07-30 15:28:03 │ day:last:20150730 │ 83xjLTYeCpPwi6KbGixwjZyqPuGdHujSoCYSv/5J8Kk= │ I631o74PUjWzaUG8sptiUEJ47Fuc36J3uZK6Sno806I= │
│ 6 │ 2015-07-30 15:28:27 │ day:last:20150730 │ HxrtjjaUoGdJbCSOYYec2pmwcJod+6zQtpN1DfBrMm4= │ 83xjLTYeCpPwi6KbGixwjZyqPuGdHujSoCYSv/5J8Kk= │
│ 7 │ 2015-07-30 15:28:30 │ day:last:20150730 │ 4MfAtG4Ra4dDVNzm9kuFgb0jkYawPzCpeOPcOGVvcjo= │ HxrtjjaUoGdJbCSOYYec2pmwcJod+6zQtpN1DfBrMm4= │
│ 8 │ 2015-07-30 15:28:32 │ day:last:20150730 │ LOlDQt8Ya6tBZcJoxDq5gtNgyUdPQp/sVWWt/F0fJYs= │ 4MfAtG4Ra4dDVNzm9kuFgb0jkYawPzCpeOPcOGVvcjo= │
│ 9 │ 2015-07-30 15:28:35 │ day:last:20150730 │ mX5Hv0ysUJxid1PAY4WshmZB7G+INzT/eURBEADcV24= │ LOlDQt8Ya6tBZcJoxDq5gtNgyUdPQp/sVWWt/F0fJYs= │
│ 10 │ 2015-07-30 15:28:48 │ day:last:20150730 │ T/SjiyeKtJ93OdOk7U4ScUOGqf33IZLy6PfaeCLxC00= │ mX5Hv0ysUJxid1PAY4WshmZB7G+INzT/eURBEADcV24= │
│ 11 │ 2015-07-30 15:28:56 │ day:last:20150730 │ P151bD78uTCZNht93Q2r/qpZJDlDfByDbkQ8y4HpMkI= │ T/SjiyeKtJ93OdOk7U4ScUOGqf33IZLy6PfaeCLxC00= │
└───────────┴─────────────────────┴───────────────────┴──────────────────────────────────────────────┴──────────────────────────────────────────────┘

10 rows in set. Elapsed: 0.001 sec. Processed 8.19 thousand rows, 1.18 MB (5.51 million rows/s., 793.31 MB/s.)
```
4 changes: 3 additions & 1 deletion bin/cli.mts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { version } from "../version.js";
export interface CSVRunOptions extends commander.RunOptions {
schema: string;
filename?: string;
delimiter: string;
}

const name = "substreams-sink-csv";
Expand All @@ -18,7 +19,8 @@ const pkg = {name, version, description};
const program = commander.program(pkg);
const command = commander.addRunOptions(program, {metrics: false, http: false});
command.addOption(new Option("--filename <string>", "CSV filename (default: '<endpoint>-<module_hash>-<module_name>.csv')").env("FILENAME"));
command.addOption(new Option("--schema <string>", "SQL Table Schema for CSV").default("schema.sql").env("SCHEMA"));
command.addOption(new Option("--schema <string>", "SQL table schema for CSV").default("schema.sql").env("SCHEMA"));
command.addOption(new Option("--delimiter <string>", "CSV delimiter").default(",").env("DELIMITER"));
command.action(action);

program.parse();
30 changes: 11 additions & 19 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { getModuleHash, isRemotePath } from "./src/getModuleHash.js";
import { parseFilename } from "./src/parseFilename.js";
import { parseClock } from "./src/parseClock.js";
import { parseSchema } from "./src/parseSchema.js";
import { writeRow } from "./src/writeRow.js";
import { applyReservedFields } from "./src/applyReservedFields.js";

export async function action(options: CSVRunOptions ) {
console.log(`[substreams-sink-csv] v${version}`);
Expand Down Expand Up @@ -58,7 +60,7 @@ export async function action(options: CSVRunOptions ) {
// log stats
let rows = 0;
let blocks = 0;
let last_block_num = 0;
let last_block_number = 0;
let last_timestamp = "";
let totalBytesRead = 0;
let totalBytesWritten = 0;
Expand Down Expand Up @@ -90,14 +92,14 @@ export async function action(options: CSVRunOptions ) {
emitter.on("clock", (clock) => {
// write block to file
// used to track how many blocks have been processed per module
const { block_num, block_id, seconds, timestamp } = parseClock(clock);
clockWriter.write([block_num, block_id, seconds, timestamp].join(",") + '\n');
const { block_number, block_id, seconds, timestamp } = parseClock(clock);
writeRow(clockWriter, [block_number, block_id, seconds, timestamp], options)
});

// Stream Messages
emitter.on("anyMessage", async (data, cursor, clock) => {
const { block_num, block_id, timestamp, seconds } = parseClock(clock);
last_block_num = block_num;
const { block_number, timestamp, seconds } = parseClock(clock);
last_block_number = block_number;
last_timestamp = timestamp;
last_seconds = seconds;

Expand All @@ -107,26 +109,16 @@ export async function action(options: CSVRunOptions ) {
const table = tables.get(entityChange.entity);
if ( !writer || !table ) throw new Error(`Table not found: ${entityChange.entity}`);
const values = getValuesInEntityChange(entityChange);

// **Reserved field names** to be used to expand the schema
values["id"] = entityChange.id;
values["cursor"] = cursor;
values["operation"] = entityChange.operation;
values["block_id"] = block_id;
values["block_num"] = block_num;
values["timestamp"] = timestamp;
values["seconds"] = seconds;
applyReservedFields(values, entityChange, cursor, clock);

// order values based on table
const data = table.map((column) => {
const value = values[column];
if ( value === undefined ) return null;
if ( typeof value == "string" && value.includes(",") ) return `"${value}"`; // escape commas
const value = values[column] as unknown;
return value;
});

// save CSV row
writer.write(data.join(",") + "\n");
writeRow(writer, data, options);
rows++;
};

Expand All @@ -140,7 +132,7 @@ export async function action(options: CSVRunOptions ) {
if ( last_update != now) {
last_update = now;
const blocksPerSecond = Math.floor(blocks / (last_update - start));
logUpdate(JSON.stringify({last_block_num, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs}));
logUpdate(JSON.stringify({last_block_number, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs}));
}
}

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.2.9",
"version": "0.2.10",
"name": "substreams-sink-csv",
"description": "Substreams Sink CSV",
"type": "module",
Expand Down
29 changes: 29 additions & 0 deletions src/applyReservedFields.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Clock } from "@substreams/core/proto"
import { EntityChange } from "@substreams/sink-entity-changes/zod";
import { parseClock, parseTimestamp } from "./parseClock.js";
import { Timestamp } from "@bufbuild/protobuf";

export function applyReservedFields( values: Record<string, unknown>, entityChange: EntityChange, cursor: string, clock: Clock ) {
const { block_number, block_id, timestamp, seconds, milliseconds, nanos } = parseClock(clock);

// **Reserved field names** to be used to expand the schema
if ( !values["id"] ) values["id"] = entityChange.id;
if ( !values["operation"] ) values["operation"] = entityChange.operation;
if ( !values["cursor"] ) values["cursor"] = cursor;
if ( !values["block"] ) values["block"] = block_number;
if ( !values["block_num"] ) values["block_num"] = block_number;
if ( !values["block_number"] ) values["block_number"] = block_number;
if ( !values["block_id"] ) values["block_id"] = block_id;
if ( !values["seconds"] ) values["seconds"] = seconds;
if ( !values["milliseconds"] ) values["milliseconds"] = milliseconds;
if ( !values["millis"] ) values["millis"] = milliseconds;
if ( !values["nanos"] ) values["nanos"] = nanos;
if ( !values["nanoseconds"] ) values["nanoseconds"] = nanos;
if ( !values["timestamp"] ) values["timestamp"] = timestamp;

// exception parsing timestamp
if ( values["timestamp"] ) values["timestamp"] = parseTimestamp(Timestamp.fromDate(new Date(values["timestamp"] as string)));
else values["timestamp"] = timestamp;

return values;
}
8 changes: 8 additions & 0 deletions src/parseClock.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { expect, test } from "bun:test";
import { parseTimestamp } from "./parseClock.js";
import { Timestamp } from "@bufbuild/protobuf";

test("parseTimestamp", () => {
// expect(parseTimestamp(timestamp)).toBe("2015-07-30T15:26:57.000Z");
expect(parseTimestamp(Timestamp.fromDate(new Date(1438270017000)))).toBe("2015-07-30 15:26:57");
})
27 changes: 19 additions & 8 deletions src/parseClock.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import { Clock } from "@substreams/core/proto";
import { Timestamp } from "@bufbuild/protobuf";

export function parseClock(clock: Clock) {
if ( !clock.timestamp ) throw new Error("Clock has no timestamp");
return {
block_num: Number(clock.number),
block_id: clock.id,
seconds: Number(clock.timestamp?.seconds),
timestamp: clock.timestamp?.toDate().toISOString(),
}
}
if ( !clock.timestamp ) throw new Error("Clock has no timestamp");
const seconds = Number(clock.timestamp?.seconds);
const nanos = Number(clock.timestamp?.nanos);
const milliseconds = seconds * 1000 + nanos / 1000000;

return {
block_number: Number(clock.number),
block_id: clock.id,
seconds,
milliseconds,
nanos,
timestamp: parseTimestamp(clock.timestamp)
}
}

export function parseTimestamp(timestamp: Timestamp) {
return timestamp.toDate().toISOString().replace("T", " ").split(".")[0]
}
14 changes: 14 additions & 0 deletions src/writeRow.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { expect, test } from "bun:test";
import { formatValue } from "./writeRow.js";

test("formatValue", () => {
const options = {delimiter: ","};
// expect(formatValue("a", options)).toBe("a");
expect(formatValue("'a'", options)).toBe("'a'");
expect(formatValue("foo bar", options)).toBe("foo bar");
expect(formatValue("foo \" bar", options)).toBe("foo \"\" bar");
expect(formatValue(undefined, options)).toBe("");
expect(formatValue(null, options)).toBe("");
expect(formatValue("a,b", options)).toBe("\"a,b\"");
expect(formatValue("a,\"b", options)).toBe("\"a,\"\"b\"");
})
20 changes: 20 additions & 0 deletions src/writeRow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import fs from "fs";

interface WriteRowOptions {
delimiter: string;
}

export function writeRow(writer: fs.WriteStream, columns: any[], options: WriteRowOptions): void {
columns = columns.map(value => formatValue(value, options));
writer.write(columns.join(options.delimiter) + '\n');
}

export function formatValue(value: string|undefined|null, options: WriteRowOptions): string {
if (value === undefined || value === null) return "";

if (typeof value == "string") {
value = value.replace(/"/g, "\"\"")
if ( value.includes(options.delimiter) ) value = `"${value}"`; // escape commas
}
return value;
}
2 changes: 1 addition & 1 deletion version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const version = "0.2.9";
export const version = "0.2.10";

0 comments on commit 1b8f7cb

Please sign in to comment.