Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dynamic block fetching based on block size #2611

Merged
merged 9 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Removed
- SmartBatchService as it didn't function as intended (#2611)

### Changed
- Implement new RampQueue to dynamically scale block fetching concurrency, this helps indexing larger blocks (#2611)
- Memoize promises to get finalized and best blocks (#2611)

## [15.0.3] - 2024-11-26
### Fixed
Expand Down
12 changes: 5 additions & 7 deletions packages/node-core/src/db/migration-service/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import {isEqual, uniq} from 'lodash';
import {NodeConfig} from '../../configure/NodeConfig';
import {HistoricalMode, StoreService} from '../../indexer';
import {getLogger} from '../../logger';
import {EnumType, getColumnOption, modelsTypeToModelAttributes} from '../../utils';
import {EnumType, getColumnOption, modelsTypeToModelAttributes, enumNameToHash} from '../../utils';
import {formatAttributes, formatColumnName, modelToTableName} from '../sequelizeUtil';
import * as syncHelper from '../sync-helper';

type RemovedIndexes = Record<string, IndexesOptions[]>;

const logger = getLogger('db-manager');

export class Migration {
Expand Down Expand Up @@ -358,8 +356,8 @@ export class Migration {
// It is difficult for sequelize use replacement, instead we use escape to avoid injection
// UPDATE: this comment got syntax error with cockroach db, disable it for now. Waiting to be fixed.
// See https://github.com/cockroachdb/cockroach/issues/44135
const enumTypeName = syncHelper.enumNameToHash(e.name);
const enumTypeNameDeprecated = `${this.schemaName}_enum_${syncHelper.enumNameToHash(e.name)}`;
const enumTypeName = enumNameToHash(e.name);
const enumTypeNameDeprecated = `${this.schemaName}_enum_${enumNameToHash(e.name)}`;

let type: string | null = null;

Expand Down Expand Up @@ -397,8 +395,8 @@ export class Migration {
}

dropEnum(e: GraphQLEnumsType): void {
const enumTypeName = syncHelper.enumNameToHash(e.name);
const enumTypeNameDeprecated = `${this.schemaName}_enum_${syncHelper.enumNameToHash(e.name)}`;
const enumTypeName = enumNameToHash(e.name);
const enumTypeNameDeprecated = `${this.schemaName}_enum_${enumNameToHash(e.name)}`;

[enumTypeName, enumTypeNameDeprecated].forEach((typeName) => {
if (this.enumTypeMap.has(typeName)) {
Expand Down
4 changes: 0 additions & 4 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,6 @@ export function createSchemaTriggerFunction(schema: string): string {
$$ LANGUAGE plpgsql;`;
}

export function enumNameToHash(enumName: string): string {
return blake2AsHex(enumName).substr(2, 10);
}

export function getExistedIndexesQuery(schema: string): string {
return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../
import {IQueue, mainThreadOnly} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {PoiBlock, PoiSyncService} from '../poi';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {IPoi} from '../storeModelProvider/poi';
Expand All @@ -32,8 +31,7 @@ export interface IBlockDispatcher<B> {
queueSize: number;
freeSize: number;
latestBufferedHeight: number;
smartBatchSize: number;
minimumHeapLimit: number;
batchSize: number;

// Remove all enqueued blocks, used when a dynamic ds is created
flushQueue(height: number): void;
Expand All @@ -53,8 +51,6 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
private _onDynamicDsCreated?: (height: number) => void;
private _pendingRewindHeader?: Header;

protected smartBatchService: SmartBatchService;

constructor(
protected nodeConfig: NodeConfig,
protected eventEmitter: EventEmitter2,
Expand All @@ -66,9 +62,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
private storeModelProvider: IStoreModelProvider,
private poiSyncService: PoiSyncService,
protected monitorService?: MonitorServiceInterface
) {
this.smartBatchService = new SmartBatchService(nodeConfig.batchSize);
}
) {}

abstract enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): void | Promise<void>;

Expand All @@ -87,12 +81,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
return this.queue.freeSpace;
}

get smartBatchSize(): number {
return this.smartBatchService.getSafeBatchSize();
}

get minimumHeapLimit(): number {
return this.smartBatchService.minimumHeapRequired;
get batchSize(): number {
return this.nodeConfig.batchSize;
}

get latestProcessedHeight(): number {
Expand Down
29 changes: 12 additions & 17 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {getHeapStatistics} from 'v8';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
Expand All @@ -12,11 +11,12 @@ import {getBlockHeight, IBlock, PoiSyncService} from '../../indexer';
import {getLogger} from '../../logger';
import {exitWithError, monitorWrite} from '../../process';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils';
import {Queue, AutoQueue, RampQueue, delay, isTaskFlushedError} from '../../utils';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';
// import { RampQueue } from '@subql/node-core/utils/rampQueue';
yoozo marked this conversation as resolved.
Show resolved Hide resolved

const logger = getLogger('BlockDispatcherService');

Expand All @@ -37,6 +37,7 @@ export abstract class BlockDispatcher<B, DS>
private fetching = false;
private isShutdown = false;

protected abstract getBlockSize(block: IBlock<B>): number;
protected abstract indexBlock(block: IBlock<B>): Promise<ProcessBlockResponse>;

constructor(
Expand All @@ -62,7 +63,13 @@ export abstract class BlockDispatcher<B, DS>
poiSyncService
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch');
this.fetchQueue = new RampQueue(
this.getBlockSize.bind(this),
nodeConfig.batchSize,
nodeConfig.batchSize * 3,
nodeConfig.timeout,
'Fetch'
);
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
} else {
Expand Down Expand Up @@ -96,17 +103,14 @@ export abstract class BlockDispatcher<B, DS>
this.processQueue.flush();
}

private memoryleft(): number {
return this.smartBatchService.heapMemoryLimit() - getHeapStatistics().used_heap_size;
}

@Interval(10000)
queueStats(stat: 'size' | 'freeSpace' = 'freeSpace'): void {
// NOTE: If the free space of the process queue is low it means that processing is the limiting factor. If it is large then fetching blocks is the limitng factor.
logger.debug(
`QUEUE INFO ${stat}: Block numbers: ${this.queue[stat]}, fetch: ${this.fetchQueue[stat]}, process: ${this.processQueue[stat]}`
);
}

private async fetchBlocksFromQueue(): Promise<void> {
if (this.fetching || this.isShutdown) return;

Expand All @@ -128,23 +132,14 @@ export abstract class BlockDispatcher<B, DS>

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

if (this.memoryleft() < 0) {
//stop fetching until memory is freed
await waitForBatchSize(this.minimumHeapLimit);
}

void this.fetchQueue
.put(async () => {
if (memoryLock.isLocked()) {
await memoryLock.waitForUnlock();
}
if (typeof blockOrNum !== 'number') {
// Type is of block
return blockOrNum;
}
const [block] = await this.fetchBlocksBatches([blockOrNum]);
this.smartBatchService.addToSizeBuffer([block]);

return block;
})
.then(
Expand Down

This file was deleted.

Loading
Loading