Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow lazy messages for monitor service #2583

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- lazy loading for monitor service (#2583)

## [14.1.7] - 2024-10-30
### Changed
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export abstract class BaseIndexerManager<

const parsedData = await this.prepareFilteredData(kind, data, ds);

monitorWrite(`- Handler: ${handler.handler}, args:${handledStringify(data)}`);
monitorWrite(() => `- Handler: ${handler.handler}, args:${handledStringify(data)}`);
this.nodeConfig.profiler
? await profilerWrap(
vm.securedExec.bind(vm),
Expand All @@ -210,7 +210,7 @@ export abstract class BaseIndexerManager<
for (const handler of handlers) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
vm = vm! ?? (await getVM(ds));
monitorWrite(`- Handler: ${handler.handler}, args:${handledStringify(data)}`);
monitorWrite(() => `- Handler: ${handler.handler}, args:${handledStringify(data)}`);
await this.transformAndExecuteCustomDs(ds, vm, handler, data);
}
}
Expand Down
10 changes: 5 additions & 5 deletions packages/node-core/src/indexer/monitor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ interface FileStat {
}

export interface MonitorServiceInterface {
write(blockData: string): void;
write(blockData: string | (() => string)): void;
createBlockFork(blockHeight: number): void;
createBlockStart(blockHeight: number): void;
}
Expand Down Expand Up @@ -212,16 +212,16 @@ export class MonitorService implements MonitorServiceInterface {

/**
* Write block record data to file
* @param blockData
* @param blockData string or function that returns a string, this allows lazy interpolation for any heavy stringification
*/
write(blockData: string): void {
write(blockData: string | (() => string)): void {
if (this.monitorFileSize <= 0) {
return;
}
this.checkAndSwitchFile();
const escapedBlockData = blockData.replace(/\n/g, '\\n');
const escapedBlockData = (typeof blockData === 'string' ? blockData : blockData()).replace(/\n/g, '\\n');
fs.appendFileSync(this.getFilePath(this.currentFile), `${escapedBlockData}\n`);
this.currentFileSize += Buffer.byteLength(blockData) + 1; // + 1 for the new line
this.currentFileSize += Buffer.byteLength(escapedBlockData) + 1; // + 1 for the new line
this.currentFileLastLine += 1;
}

Expand Down
20 changes: 11 additions & 9 deletions packages/node-core/src/indexer/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class Store implements IStore {
async get<T extends Entity>(entity: string, id: string): Promise<T | undefined> {
try {
const raw = await this.#storeCache.getModel<T>(entity).get(id);
monitorWrite(`-- [Store][get] Entity ${entity} ID ${id}, data: ${handledStringify(raw)}`);
monitorWrite(() => `-- [Store][get] Entity ${entity} ID ${id}, data: ${handledStringify(raw)}`);
return EntityClass.create<T>(entity, raw, this);
} catch (e) {
throw new Error(`Failed to get Entity ${entity} with id ${id}: ${e}`);
Expand All @@ -60,7 +60,7 @@ export class Store implements IStore {
this.#queryLimitCheck('getByField', entity, options);

const raw = await this.#storeCache.getModel<T>(entity).getByField(field, value, options);
monitorWrite(`-- [Store][getByField] Entity ${entity}, data: ${handledStringify(raw)}`);
monitorWrite(() => `-- [Store][getByField] Entity ${entity}, data: ${handledStringify(raw)}`);
return raw.map((v) => EntityClass.create<T>(entity, v, this)) as T[];
} catch (e) {
throw new Error(`Failed to getByField Entity ${entity} with field ${String(field)}: ${e}`);
Expand All @@ -84,7 +84,7 @@ export class Store implements IStore {
this.#queryLimitCheck('getByFields', entity, options);

const raw = await this.#storeCache.getModel<T>(entity).getByFields(filter, options);
monitorWrite(`-- [Store][getByFields] Entity ${entity}, data: ${handledStringify(raw)}`);
monitorWrite(() => `-- [Store][getByFields] Entity ${entity}, data: ${handledStringify(raw)}`);
return raw.map((v) => EntityClass.create<T>(entity, v, this)) as T[];
} catch (e) {
throw new Error(`Failed to getByFields Entity ${entity}: ${e}`);
Expand All @@ -96,7 +96,7 @@ export class Store implements IStore {
const indexed = this.#context.isIndexedHistorical(entity, field as string);
assert(indexed, `to query by field ${String(field)}, a unique index must be created on model ${entity}`);
const raw = await this.#storeCache.getModel<T>(entity).getOneByField(field, value);
monitorWrite(`-- [Store][getOneByField] Entity ${entity}, data: ${handledStringify(raw)}`);
monitorWrite(() => `-- [Store][getOneByField] Entity ${entity}, data: ${handledStringify(raw)}`);
return EntityClass.create<T>(entity, raw, this);
} catch (e) {
throw new Error(`Failed to getOneByField Entity ${entity} with field ${String(field)}: ${e}`);
Expand All @@ -108,7 +108,7 @@ export class Store implements IStore {
try {
this.#storeCache.getModel(entity).set(_id, data, this.#context.blockHeight);
monitorWrite(
`-- [Store][set] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
() => `-- [Store][set] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
);
this.#context.operationStack?.put(OperationType.Set, entity, data);
} catch (e) {
Expand All @@ -123,7 +123,8 @@ export class Store implements IStore {
this.#context.operationStack?.put(OperationType.Set, entity, item);
}
monitorWrite(
`-- [Store][bulkCreate] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
() =>
`-- [Store][bulkCreate] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
);
} catch (e) {
throw new Error(`Failed to bulkCreate Entity ${entity}: ${e}`);
Expand All @@ -138,7 +139,8 @@ export class Store implements IStore {
this.#context.operationStack?.put(OperationType.Set, entity, item);
}
monitorWrite(
`-- [Store][bulkUpdate] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
() =>
`-- [Store][bulkUpdate] Entity ${entity}, height: ${this.#context.blockHeight}, data: ${handledStringify(data)}`
);
} catch (e) {
throw new Error(`Failed to bulkCreate Entity ${entity}: ${e}`);
Expand All @@ -149,7 +151,7 @@ export class Store implements IStore {
try {
this.#storeCache.getModel(entity).remove(id, this.#context.blockHeight);
this.#context.operationStack?.put(OperationType.Remove, entity, id);
monitorWrite(`-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, id: ${id}`);
monitorWrite(() => `-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, id: ${id}`);
} catch (e) {
throw new Error(`Failed to remove Entity ${entity} with id ${id}: ${e}`);
}
Expand All @@ -163,7 +165,7 @@ export class Store implements IStore {
this.#context.operationStack?.put(OperationType.Remove, entity, id);
}
monitorWrite(
`-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, ids: ${handledStringify(ids)}`
() => `-- [Store][remove] Entity ${entity}, height: ${this.#context.blockHeight}, ids: ${handledStringify(ids)}`
);
} catch (e) {
throw new Error(`Failed to bulkRemove Entity ${entity}: ${e}`);
Expand Down
15 changes: 5 additions & 10 deletions packages/node-core/src/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ export function exitWithError(error: Error | string, logger?: Pino.Logger, code
process.exit(code);
}

export function monitorWrite(blockData: string): void {
if (monitorService) {
monitorService.write(blockData);
}
// Function argument is to allow for lazy evauluation only if monitor service is enabled
export function monitorWrite(blockData: string | (() => string)): void {
monitorService?.write(blockData);
}

export function monitorCreateBlockStart(blockNumber: number): void {
if (monitorService) {
monitorService.createBlockStart(blockNumber);
}
monitorService?.createBlockStart(blockNumber);
}

export function monitorCreateBlockFork(blockNumber: number): void {
if (monitorService) {
monitorService.createBlockFork(blockNumber);
}
monitorService?.createBlockFork(blockNumber);
}
Loading