Skip to content

Commit

Permalink
implement filebase buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 19, 2024
1 parent 8bf82c5 commit 4fbd58c
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 73 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ bun.lockdb
.DS_Store
*.sqlite
*.db
*.pem
*.pem
*.txt
13 changes: 9 additions & 4 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ import { logger } from "./src/logger.js";
import init from "./src/fetch/init.js";
import { show_databases, show_tables } from "./src/clickhouse/stores.js";
import "./src/exitHandler.js"
import * as buffer from "./src/buffer.js"

if (config.verbose) logger.enable();

// initizalize before starting the server
await show_tables();
await show_databases();
await init();
await buffer.flush(true);

const app = Bun.serve({
hostname: config.hostname,
port: config.port,
Expand All @@ -28,12 +35,10 @@ const app = Bun.serve({
},
});

// logging
logger.info('[app]\t', `${name} v${version}`);
logger.info('[app]\t', `Sink Server listening on http://${app.hostname}:${app.port}`);
logger.info('[app]\t', `Clickhouse DB ${config.host} (${config.database})`);
for ( const publicKey of publicKeys ) {
logger.info('[app]\t', `Webhook Ed25519 public key (${publicKey})`);
}
await show_tables();
await show_databases();
await init();
}
68 changes: 68 additions & 0 deletions src/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import readline from "readline";
import fs from "fs";
import { client } from "./clickhouse/createClient.js";
import { logger } from "./logger.js";
import * as store from "./clickhouse/stores.js";

export type Values = Record<string, unknown>;
export type Buffer = Map<string, Values[]>;

const path = "buffer.txt";
const encoding = "utf-16le";
const writer = fs.createWriteStream(path, {flags: "a", encoding});

export let inserts = 0;

export function bulkInsert(rows: {table: string, values: Values}[]) {
writer.write(rows.map(row => JSON.stringify(row)).join("\n") + "\n");
}

export function insert(table: string, values: Values) {
store.check_table(table);
writer.write(JSON.stringify({table, values}) + "\n");
}

export async function read(): Promise<Buffer> {
const buffer: Buffer = new Map();
if ( !fs.existsSync(path) ) return buffer;
const input = fs.createReadStream(path, {encoding});
const rl = readline.createInterface({ input });

return new Promise((resolve, reject) => {
rl.on("line", (line) => {
const {table, values} = JSON.parse(line);
if (buffer.has(table)) {
buffer.get(table)?.push(values);
} else {
buffer.set(table, [values]);
}
});
rl.on("close", () => {
return resolve(buffer);
});
rl.on("error", (err) => {
return reject(err);
});
});
}

export async function flush(verbose = false) {
if ( !fs.existsSync(path) ) return;
const buffer = await read();
for ( const [table, values] of buffer.entries() ) {
await client.insert({table, values, format: "JSONEachRow"})
if ( verbose ) logger.info('[buffer::flush]', `\tinserted ${values.length} rows into ${table}`);
buffer.delete(table);
inserts++;
}
// clear the buffer
fs.createWriteStream(path, {encoding});
}

export function count(buffer: Buffer) {
let count = 0
for ( const value of buffer.values() ) {
count += value.length;
};
return count;
}
8 changes: 4 additions & 4 deletions src/clickhouse/createDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { client } from "./createClient.js";
import { databases } from "./stores.js";

export async function createDatabase(database: string) {
if (!database) {
throw new Error("[database] is required")
}
if ( databases?.has(database) ) return {};
// if (!database) {
// throw new Error("[database] is required")
// }
// if ( databases?.has(database) ) return {};
const query = `CREATE DATABASE IF NOT EXISTS "${database}"`;
logger.info('[clickhouse::createDatabase]\t', `CREATE DATABASE [${database}]`);
return {query, ...await client.exec({ query })};
Expand Down
79 changes: 20 additions & 59 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,25 @@ import { EntityChange } from "@substreams/sink-entity-changes/zod";
import { Clock, Manifest } from "substreams-sink-webhook/schemas";
import * as prometheus from "../prometheus.js";
import { PayloadBody } from "../schemas.js";
import { client } from "./createClient.js";
import * as store from "./stores.js";
import logUpdate from 'log-update';
import { logger } from "../logger.js";
import { now } from "../utils.js";
import * as buffer from "../buffer.js"

type Metadata = { clock: Clock; manifest: Manifest; cursor: string };

const buffer = new Map<string, Record<string, unknown>[]>()

function now() {
return Math.floor(new Date().getTime() / 1000);
}

let entities = 0;
let blocks = 0;
let inserts = 0;
let start = now();
let lastUpdate = now();

function bufferCount() {
let count = 0
for ( const value of buffer.values() ) {
count += value.length;
};
return count;
}

// TO-DO - use Prometheus metrics as input to this function
function logProgress() {
const delta = now() - start
const blockRate = Math.round(blocks / delta);
const entitiesRate = Math.round(entities / delta);
const insertsRate = Math.round(inserts / delta);
const count = bufferCount();
const insertsRate = Math.round(buffer.inserts / delta);
blocks++;
logUpdate(`[clickhouse::handleSinkRequest] blocks=${blocks} [${blockRate}/s] entities=${entities} [${entitiesRate}/s] inserts=${inserts} [${insertsRate}/s] buffer=${count}`);
}

export async function flushBuffer(verbose = false) {
// clear buffer every 1 second
if ( lastUpdate != now() ) {
for ( const [table, values] of buffer.entries() ) {
await client.insert({table, values, format: "JSONEachRow"})
if ( verbose ) logger.info('[handleSinkRequest]', `\tinserted ${values.length} rows into ${table}`);
buffer.delete(table);
inserts++;
}
lastUpdate = now();
}
logUpdate(`[clickhouse::handleSinkRequest] blocks=${blocks} [${blockRate}/s] entities=${entities} [${entitiesRate}/s] inserts=${buffer.inserts} [${insertsRate}/s]`);
}

// ~200-500 blocks per second
Expand All @@ -70,7 +40,11 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
insertModuleHashes(metadata);
insertBlocks(metadata);

await flushBuffer();
// clear buffer every 1 second
if ( lastUpdate != now() ) {
await buffer.flush();
lastUpdate = now();
}

// logging
prometheus.sink_requests.inc({
Expand All @@ -93,7 +67,7 @@ function insertModuleHashes(metadata: Metadata) {
latest_block_id: metadata.clock.id,
latest_timestamp: Number(new Date(metadata.clock.timestamp)),
};
insertToBuffer("module_hashes", values);
buffer.insert("module_hashes", values);
}

function insertBlocks(metadata: Metadata) {
Expand All @@ -104,38 +78,29 @@ function insertBlocks(metadata: Metadata) {
timestamp: Number(new Date(metadata.clock.timestamp)),
block_id: metadata.clock.id,
};
insertToBuffer("blocks", values);
}

function insertToBuffer(table: string, values: Record<string, unknown>) {
// throw error if tables are not loaded
if (!store.tables) throw new Error("no tables are loaded");
if (!store.tables.has(table)) {
throw new Error(`table ${table} does not exist (call HTTP PUT "/sql/schema" to create table schemas)`);
}

// append values to buffer (used for in-memory Clickhouse DB batching)
if ( !buffer.has(table) ) {
buffer.set(table, [values]);
} else {
buffer.get(table)?.push(values);
}
buffer.insert("blocks", values);
}

function handleEntityChanges(entityChanges: EntityChange[], metadata: Metadata) {
const rows = [];
for (const change of entityChanges) {
const values = getValuesInEntityChange(change);
const id = change.id; // primary key
insertEntityChange(change.entity, values, change.operation, { ...metadata, id });
const row = insertEntityChange(change.entity, values, change.operation, { ...metadata, id });
rows.push(row);
}
buffer.bulkInsert(rows);
}

function handleDatabaseChanges(tableChanges: TableChange[], metadata: Metadata) {
const rows = [];
for (const change of tableChanges) {
const values = getValuesInTableChange(change);
const id = ""; // database changes do not have a primary key
insertEntityChange(change.table, values, change.operation, { ...metadata, id });
const row = insertEntityChange(change.table, values, change.operation, { ...metadata, id });
rows.push(row);
}
buffer.bulkInsert(rows);
}

function insertEntityChange(
Expand All @@ -151,7 +116,6 @@ function insertEntityChange(
values["module_hash"] = metadata.manifest.moduleHash;
values["timestamp"] = Number(new Date(metadata.clock.timestamp)); // Block timestamp
values["operation"] = operation;
insertToBuffer(table, values);
entities++;

// log
Expand All @@ -160,8 +124,5 @@ function insertEntityChange(
module_hash: metadata.manifest.moduleHash,
operation,
});
return { table, values };
}

function timeout(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
7 changes: 7 additions & 0 deletions src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ export let tables: Set<string> | null = null;
export let databases: Set<string> | null = null;
export let paused = false;

export function check_table(table: string) {
if (!tables) throw new Error("no tables are loaded");
if (!tables.has(table)) {
throw new Error(`table ${table} does not exist (call HTTP PUT "/sql/schema" to create table schemas)`);
}
}

export function pause(value: boolean) {
paused = value;
logger.info('[store::pause]', `\tpaused=${paused}`);
Expand Down
6 changes: 4 additions & 2 deletions src/clickhouse/table-initialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import { logger } from "../logger.js";
import { client } from "./createClient.js";
import { augmentCreateTableStatement, getTableName, isCreateTableStatement } from "./table-utils.js";
import { sqlTables } from "../../sql/tables/tables.js";
import { tables } from "./stores.js";
import { show_tables, tables } from "./stores.js";

export async function initializeDefaultTables() {
const results = [];
for ( const [ table, query ] of sqlTables ) {
if ( tables?.has(table) ) continue;
// if ( tables?.has(table) ) continue;
logger.info('[clickhouse::initializeDefaultTables]\t', `CREATE TABLE [${table}]`);
results.push({table, query, ...await client.exec({ query })});
}
await show_tables()
return results;
}

Expand Down Expand Up @@ -60,5 +61,6 @@ export async function executeCreateStatements(statements: string[]) {
}
if ( executedStatements.length == 0 ) throw new Error("No statements executed");
logger.info('[clickhouse::executeCreateStatements]', "Complete.");
await show_tables();
return executedStatements;
}
7 changes: 4 additions & 3 deletions src/exitHandler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { flushBuffer } from "./clickhouse/handleSinkRequest.js";
// import { flushBuffer } from "./clickhouse/handleSinkRequest.js";
import { pause } from "./clickhouse/stores.js";
import { logger } from "./logger.js";
import * as buffer from "./buffer.js";

export function exitHandler() {
export async function exitHandler() {
logger.info('[app]\t', `Server shutting down...`);
pause(true);
flushBuffer();
await buffer.flush(true);
process.exit();
}

Expand Down
6 changes: 6 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export function timeout(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
export function now() {
return Math.floor(new Date().getTime() / 1000);
}

0 comments on commit 4fbd58c

Please sign in to comment.