Skip to content

Commit

Permalink
Merge pull request #5 from pinax-network/fix/sql-table
Browse files Browse the repository at this point in the history
fix sql table parsing
  • Loading branch information
DenisCarriere authored Mar 2, 2024
2 parents 7a6f226 + 7d4fa9e commit c69c0ff
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 16 deletions.
16 changes: 10 additions & 6 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import { parseClock } from "./src/parseClock.js";
import { parseSchema } from "./src/parseSchema.js";

export async function action(options: CSVRunOptions ) {
// @substreams/manifest issue
// if manifest is local, add current directory
if (!isRemotePath(options.manifest) && !path.isAbsolute(options.manifest)) {
const currentDir = process.cwd();
options.manifest = path.join(currentDir, options.manifest);
// handle file system manifest
// can be removed when issue resolved
// https://github.com/substreams-js/substreams-js/issues/62
if (!isRemotePath(options.manifest)) {
// if manifest is not absolute, add current directory
if ( !path.isAbsolute(options.manifest)) {
const currentDir = process.cwd();
options.manifest = path.join(currentDir, options.manifest);
}
if ( !fs.existsSync(options.manifest) ) throw new Error(`Manifest file not found: ${options.manifest}`);
}
if ( !fs.existsSync(options.manifest) ) throw new Error(`Manifest file not found: ${options.manifest}`);

// SQL schema
if ( !fs.existsSync(options.schema) ) throw new Error(`Schema file not found: ${options.schema}`);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.2.5",
"version": "0.2.6",
"name": "substreams-sink-csv",
"description": "Substreams Sink CSV",
"type": "module",
Expand Down
8 changes: 8 additions & 0 deletions src/getModuleHash.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { expect, test } from "bun:test";
import { isRemotePath } from "./getModuleHash.js";

test("isRemotePath", () => {
expect(isRemotePath("https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg")).toBe(true);
expect(isRemotePath("substreams-eth-block-meta-v0.5.1.spkg")).toBe(false);
expect(isRemotePath("./substreams-eth-block-meta-v0.5.1.spkg")).toBe(false);
})
91 changes: 90 additions & 1 deletion src/parseSchema.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect, test } from "bun:test";
import { parseColumn, parseCreateTable, parseSchema } from "./parseSchema.js";
import { parseColumn, parseCreateTable, parseSchema, preParseStatement } from "./parseSchema.js";

test("parseCreateTable", () => {
expect(parseCreateTable("CREATE TABLE block_meta")).toBe("block_meta");
Expand All @@ -23,6 +23,12 @@ test("parseColumn", () => {
expect(parseColumn("CONSTRAINT PK_Person PRIMARY KEY (ID,LastName)")).toBe("");
})

test("preParseStatement", () => {
expect(preParseStatement("parent_hash TEXT, ")).toBe("parent_hash TEXT");
expect(preParseStatement(" \"timestamp\" INTEGER ")).toBe("timestamp INTEGER");
expect(preParseStatement("'timestamp' INTEGER")).toBe("timestamp INTEGER");
})

test("parseSchema::factory_pair_created", () => {
const sql = `
CREATE TABLE factory_pair_created (
Expand Down Expand Up @@ -63,3 +69,86 @@ test("parseSchema::block_meta", () => {
const tables = parseSchema(sql);
expect(tables).toEqual(new Map([["block_meta", ["id", "at", "number", "hash", "parent_hash", "timestamp"]]]));
});

test("parseSchema::transfers", () => {
const sql = `
-- Table for transfers --
CREATE TABLE IF NOT EXISTS transfers (
-- trace information
trx_id String,
action_index UInt32,
-- contract & scope --
contract FixedString(12),
action String,
symcode String,
-- data payload --
from FixedString(12),
to FixedString(12),
quantity String,
memo String,
-- extras --
precision UInt32,
amount Int64,
value Float64,
)
ENGINE = ReplacingMergeTree()
-- primary key = trx_id + action_index --
PRIMARY KEY (id)
ORDER BY (id);
-- Table for accounts --
CREATE TABLE IF NOT EXISTS accounts (
-- trace information --
trx_id String,
action_index UInt32,
-- contract & scope --
contract FixedString(12),
symcode String,
-- data payload --
account FixedString(12),
balance String,
balance_delta Int64,
-- extras --
precision UInt32,
amount Int64,
value Float64,
)
ENGINE = ReplacingMergeTree()
-- primary key = trx_id + action_index --
PRIMARY KEY (id)
ORDER BY (id);
`
const tables = parseSchema(sql);
expect(tables).toEqual(new Map([[
"transfers",
[
"trx_id",
"action_index",
"contract",
"action",
"symcode",
"from",
"to",
"quantity",
"memo",
"precision",
"amount",
"value",
]],[
"accounts",
[
"trx_id",
"action_index",
"contract",
"symcode",
"account",
"balance",
"balance_delta",
"precision",
"amount",
"value",
]]]));
});
28 changes: 20 additions & 8 deletions src/parseSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ export function parseSchema(sql: string) {
const tables = new Map<string, string[]>(); // <table, columns>
const statements = sql.split(";")

// should return `block_meta` as table and `id, at, number, hash, parent_hash, timestamp` as columns
for (const statement of statements) {
for (let statement of statements) {
statement = preParseStatement(statement);
const lines = statement.trim().split("\n");
const table = parseCreateTable(lines[0]);
// console.log(table, lines);
if ( !table ) continue;
const columns = new Set<string>();
let table = '';
for ( const line of lines) {
const parsedTable = parseCreateTable(line);
if ( parsedTable ) table = parsedTable;
if ( !table ) continue;
const column = parseColumn(line);
if (column) columns.add(column);
}
tables.set(table, Array.from(columns));
if ( table ) tables.set(table, Array.from(columns));
}
return tables;
}
Expand All @@ -24,6 +25,7 @@ export function parseSchema(sql: string) {
// create table block meta
// CREATE TABLE IF NOT EXISTS block_meta
export function parseCreateTable(statement: string) {
statement = preParseStatement(statement);
const match = statement.match(/^CREATE TABLE/i);
if (match) {
statement = statement.replace("(", "").trim();
Expand All @@ -37,16 +39,26 @@ export function parseCreateTable(statement: string) {
// parent_hash TEXT,
// timestamp INTEGER
export function parseColumn(statement: string) {
statement = statement.trim().replace(/[,;]/g, ''); // remove trailing comma or semicolon
statement = statement.replace(/[\"\']/g, ''); // remove quotes
statement = preParseStatement(statement);
if ( statement.match(/^CREATE TABLE/i) ) return '' // ignore table name
if ( statement.match(/^PRIMARY KEY/i) ) return '' // ignore primary key as valid column
if ( statement.match(/^\)/) ) return '' // ignore closing parenthesis
if ( statement.match(/^\s*$/) ) return '' // ignore empty lines
if ( statement.match(/^CONSTRAINT/i) ) return '' // ignore constraints
if ( statement.match(/^ENGINE/i) ) return '' // ignore engine
if ( statement.match(/^ORDER BY/i) ) return '' // ignore engine
if ( statement.match(/^--/i) ) return '' // ignore comments
const words = statement.split(" ");
if ( words.length > 1) {
return words[0].trim();
}
return '';
}

// remove trailing comma or semicolon
// remove quotes
export function preParseStatement(statement: string) {
return statement
.replace(/[,;\'\"]/g, '')
.trim()
}

0 comments on commit c69c0ff

Please sign in to comment.