Skip to content

Commit

Permalink
Merge pull request #3 from pinax-network/fix/start-block
Browse files Browse the repository at this point in the history
add logging for session
  • Loading branch information
DenisCarriere authored Mar 1, 2024
2 parents 0b28507 + e7d1dc6 commit e5ee262
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lerna-debug.log*
*.cursor
*.clock
*.spkg
*.session

# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
Expand Down
36 changes: 33 additions & 3 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export async function action(options: CSVRunOptions ) {

// Cursor
const moduleHash = await getModuleHash(options);
const { name, cursorFile, clockFile } = parseFilename(moduleHash, options);
const { name, cursorFile, clockFile, sessionFile } = parseFilename(moduleHash, options);
const startCursor = fs.existsSync(cursorFile) ? fs.readFileSync(cursorFile, "utf8") : '';

// CSV writer (append)
Expand All @@ -45,9 +45,31 @@ export async function action(options: CSVRunOptions ) {
// Block Emitter
const { emitter } = await setup({ ...options, cursor: startCursor });

// stats
// log stats
let rows = 0;
let blocks = 0;
let last_block_num = 0;
let last_timestamp = "";
let totalBytesRead = 0;
let totalBytesWritten = 0;
let traceId = "";
let start_block = 0;
let workers = 0;

emitter.on("session", (session) => {
fs.writeFileSync(sessionFile, JSON.stringify(session, null, 2));
traceId = session.traceId;
start_block = Number(session.resolvedStartBlock);
workers = Number(session.maxParallelWorkers)
});

emitter.on("progress", (progress) => {
if ( progress.processedBytes ) {
totalBytesRead += Number(progress.processedBytes.totalBytesRead);
totalBytesWritten += Number(progress.processedBytes.totalBytesWritten);
}
log();
});

emitter.on("clock", (clock) => {
// write block to file
Expand Down Expand Up @@ -90,9 +112,17 @@ export async function action(options: CSVRunOptions ) {
};

// logging
logUpdate(`[substreams-sink-csv] block_num=${block_num} timestamp=${timestamp} blocks=${++blocks} rows=${rows}`);
blocks++;
log();
});

function log() {
logUpdate(`[substreams-sink-csv]
trace_id=${traceId} start_block=${start_block} module_hash=${moduleHash} workers=${workers}
last_block_num=${last_block_num} last_timestamp=${last_timestamp} blocks=${blocks} rows=${rows} bytes_read=${totalBytesRead} bytes_written=${totalBytesWritten}
`);
}

fileCursor.onCursor(emitter, cursorFile);
emitter.start();
}
6 changes: 4 additions & 2 deletions src/parseFilename.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ export function parseFilename(moduleHash: string, options: CSVRunOptions) {
if ( !fs.existsSync(dirname) ) fs.mkdirSync(dirname, { recursive: true });
const cursorFile = `${name}.cursor`;
const clockFile = `${name}.clock`;
return { name, cursorFile, clockFile };
const sessionFile = `${name}.session`;
return { name, cursorFile, clockFile, sessionFile };
}
// auto-generate filename (<network>-<moduleHash>-<moduleName>.csv)
const network = options.substreamsEndpoint.split(":")[0];
const name = `${network}-${moduleHash}-${options.moduleName}`
const cursorFile = `${name}.cursor`;
const clockFile = `${name}.clock`;
return { name, cursorFile, clockFile };
const sessionFile = `${name}.session`;
return { name, cursorFile, clockFile, sessionFile };
}

0 comments on commit e5ee262

Please sign in to comment.