diff --git a/index.ts b/index.ts index f98f11b..c98ffad 100644 --- a/index.ts +++ b/index.ts @@ -1,5 +1,6 @@ import fs from "fs"; import path from "path"; +import pkg from "./package.json" assert { type: "json" }; import { setup, fileCursor } from "substreams-sink"; import { CSVRunOptions } from "./bin/cli.js" import { EntityChanges, getValuesInEntityChange } from "@substreams/sink-entity-changes/zod" @@ -10,6 +11,8 @@ import { parseClock } from "./src/parseClock.js"; import { parseSchema } from "./src/parseSchema.js"; export async function action(options: CSVRunOptions ) { + console.log(`[substreams-sink-csv] v${pkg.version}`); + // handle file system manifest // can be removed when issue resolved // https://github.com/substreams-js/substreams-js/issues/62 @@ -29,14 +32,17 @@ export async function action(options: CSVRunOptions ) { // Cursor const moduleHash = await getModuleHash(options); + console.log(JSON.stringify({manifest: options.manifest, moduleName: options.moduleName, moduleHash})); const { name, cursorFile, clockFile, sessionFile } = parseFilename(moduleHash, options); const startCursor = fs.existsSync(cursorFile) ? fs.readFileSync(cursorFile, "utf8") : ''; + console.log(JSON.stringify({name, cursorFile, clockFile, sessionFile})); // CSV writer (append) const clockWriter = fs.createWriteStream(clockFile, {flags: "a"}); const writers: Map = new Map(); for ( const [table, columns] of tables ) { + console.log(JSON.stringify({table, columns})); const filename = `${name}-${table}.csv`; const writer = fs.createWriteStream(filename, {flags: "a"}); if ( !fs.existsSync(filename) ) writer.write(columns.join(",") + "\n"); @@ -57,20 +63,23 @@ export async function action(options: CSVRunOptions ) { let totalBytesRead = 0; let totalBytesWritten = 0; let traceId = ""; - let start_block = 0; - let workers = 0; + let resolvedStartBlock = 0; + let maxParallelWorkers = 0; + let runningJobs = 0; emitter.on("session", (session) => { fs.writeFileSync(sessionFile, JSON.stringify(session, null, 2)); traceId = session.traceId; - start_block = Number(session.resolvedStartBlock); - workers = Number(session.maxParallelWorkers) + resolvedStartBlock = Number(session.resolvedStartBlock); + maxParallelWorkers = Number(session.maxParallelWorkers) + console.log(JSON.stringify({traceId, resolvedStartBlock, maxParallelWorkers})); }); emitter.on("progress", (progress) => { if ( progress.processedBytes ) { totalBytesRead += Number(progress.processedBytes.totalBytesRead); totalBytesWritten += Number(progress.processedBytes.totalBytesWritten); + runningJobs = progress.runningJobs.length; } log(); }); @@ -85,6 +94,8 @@ export async function action(options: CSVRunOptions ) { // Stream Messages emitter.on("anyMessage", async (data, cursor, clock) => { const { block_num, block_id, timestamp, seconds } = parseClock(clock); + last_block_num = block_num; + last_timestamp = timestamp; // block header for ( const entityChange of EntityChanges.parse(data).entityChanges ) { @@ -121,10 +132,7 @@ export async function action(options: CSVRunOptions ) { }); function log() { - logUpdate(`[substreams-sink-csv] -trace_id=${traceId} start_block=${start_block} module_hash=${moduleHash} workers=${workers} -last_block_num=${last_block_num} last_timestamp=${last_timestamp} blocks=${blocks} rows=${rows} bytes_read=${totalBytesRead} bytes_written=${totalBytesWritten} -`); + logUpdate(JSON.stringify({last_block_num, last_timestamp, blocks, rows, totalBytesRead, totalBytesWritten, runningJobs})); } fileCursor.onCursor(emitter, cursorFile); diff --git a/package.json b/package.json index a6caefe..a7b49e3 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "version": "0.2.6", + "version": "0.2.7", "name": "substreams-sink-csv", "description": "Substreams Sink CSV", "type": "module",