From 52889df9dffcbe195797e40c1364732f6c9d6cd4 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Mon, 9 Oct 2023 12:52:31 +1300 Subject: [PATCH 1/4] migrate poi in one tx, also fix auto queue timeout --- .../src/indexer/dictionary.service.ts | 12 +++++- .../node-core/src/indexer/poi/poi.service.ts | 38 ++++++++++++++----- packages/node-core/src/indexer/sandbox.ts | 6 ++- .../indexer/storeCache/baseCache.service.ts | 2 +- .../src/indexer/worker/worker.service.ts | 2 +- packages/node-core/src/utils/autoQueue.ts | 10 +++-- packages/node-core/src/utils/promise.ts | 4 +- packages/node-core/src/utils/sync-helper.ts | 27 +++++++++++++ 8 files changed, 81 insertions(+), 20 deletions(-) diff --git a/packages/node-core/src/indexer/dictionary.service.ts b/packages/node-core/src/indexer/dictionary.service.ts index 2f42d7ae52..963fc4a914 100644 --- a/packages/node-core/src/indexer/dictionary.service.ts +++ b/packages/node-core/src/indexer/dictionary.service.ts @@ -263,7 +263,12 @@ export class DictionaryService { query: gql(query), variables, }), - this.nodeConfig.dictionaryTimeout + this.nodeConfig.dictionaryTimeout, + `Dictionary query timeout in ${ + this.nodeConfig.dictionaryTimeout + } seconds. Please increase --dictionary-timeout. ${ + this.nodeConfig.debug ? `\n GraphQL: ${query}, \n Variables: ${variables}` : '' + }` ); const blockHeightSet = new Set(); const entityEndBlock: {[entity: string]: number} = {}; @@ -386,7 +391,10 @@ export class DictionaryService { this.client.query({ query: gql(query), }), - this.nodeConfig.dictionaryTimeout + this.nodeConfig.dictionaryTimeout, + `Dictionary metadata query timeout in ${ + this.nodeConfig.dictionaryTimeout + } seconds. Please increase --dictionary-timeout. ${this.nodeConfig.debug ? `\n GraphQL: ${query}` : ''}` ); const _metadata = resp.data._metadata; diff --git a/packages/node-core/src/indexer/poi/poi.service.ts b/packages/node-core/src/indexer/poi/poi.service.ts index c663bc8458..3656584d93 100644 --- a/packages/node-core/src/indexer/poi/poi.service.ts +++ b/packages/node-core/src/indexer/poi/poi.service.ts @@ -9,6 +9,7 @@ import {Op, QueryTypes, Transaction} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {PoiEvent} from '../../events'; import {getLogger} from '../../logger'; +import {sqlIterator} from '../../utils'; import {ProofOfIndex, SyncedProofOfIndex} from '../entities/Poi.entity'; import {StoreCacheService} from '../storeCache'; import {CachePoiModel} from '../storeCache/cachePoi'; @@ -134,34 +135,46 @@ export class PoiService implements OnApplicationShutdown { queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "chainBlockHash" DROP NOT NULL;`); // keep existing chainBlockHash queries.push( - `CREATE UNIQUE INDEX IF NOT EXISTS "poi_chainBlockHash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL` + sqlIterator( + tableName, + `CREATE UNIQUE INDEX IF NOT EXISTS "poi_chainBlockHash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL` + ) ); } if (!checkResult.hash_nullable) { queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "hash" DROP NOT NULL;`); - queries.push(`UPDATE ${tableName} SET hash = NULL;`); queries.push( - `CREATE UNIQUE INDEX IF NOT EXISTS "poi_hash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL` + sqlIterator( + tableName, + `CREATE UNIQUE INDEX IF NOT EXISTS "poi_hash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL` + ) ); } if (!checkResult.parent_nullable) { queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "parentHash" DROP NOT NULL;`); - queries.push(`UPDATE ${tableName} SET "parentHash" = NULL;`); queries.push( - `CREATE UNIQUE INDEX IF NOT EXISTS "poi_parent_hash" ON ${tableName} ("parentHash") WHERE "parentHash" IS NOT NULL` + sqlIterator( + tableName, + `CREATE UNIQUE INDEX IF NOT EXISTS "poi_parent_hash" ON ${tableName} ("parentHash") WHERE "parentHash" IS NOT NULL` + ) ); } } if (queries.length) { + const tx = await this.poiRepo.model.sequelize?.transaction(); + if (!tx) { + throw new Error(`Create transaction for poi migration got undefined!`); + } for (const query of queries) { try { - await this.poiRepo?.model.sequelize?.query(query, {type: QueryTypes.SELECT}); + await this.poiRepo?.model.sequelize?.query(query, {type: QueryTypes.SELECT, transaction: tx}); } catch (e) { logger.error(`Migration poi failed with query: ${query}`); throw e; } } + await tx.commit(); logger.info(`Successful migrate Poi`); if (checkResult?.mmr_exists) { logger.info(`If file based mmr were used previously, it can be clean up mannually`); @@ -173,7 +186,10 @@ export class PoiService implements OnApplicationShutdown { // Before migration `latestSyncedPoiHeight` haven't been record in Db meta // we try to find the first height from current poi table. and set for once const genesisPoi = await this.poiRepo.getFirst(); - if (genesisPoi && (genesisPoi.hash === null || genesisPoi.parentHash === null)) { + // if (genesisPoi && (genesisPoi.hash === null || genesisPoi.parentHash === null)) { + // this.createGenesisPoi(genesisPoi); + // } + if (genesisPoi) { this.createGenesisPoi(genesisPoi); } @@ -303,9 +319,11 @@ export class PoiService implements OnApplicationShutdown { this.setLatestSyncedPoi(syncedPoiBlock); } if (appendedBlocks.length) { - if (this.nodeConfig.debug) { - syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); - } + syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); + + // if (this.nodeConfig.debug) { + // syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); + // } this.poiRepo?.bulkUpsert(appendedBlocks); } } diff --git a/packages/node-core/src/indexer/sandbox.ts b/packages/node-core/src/indexer/sandbox.ts index 39f34d8307..694b6235b5 100644 --- a/packages/node-core/src/indexer/sandbox.ts +++ b/packages/node-core/src/indexer/sandbox.ts @@ -69,7 +69,11 @@ export class Sandbox extends NodeVM { } async runTimeout(duration: number): Promise { - return timeout(this.run(this.script), duration); + return timeout( + this.run(this.script), + duration, + `Sandbox execution timeout in ${duration} seconds. Please increase --timeout` + ); } protected async convertStack(stackTrace: string | undefined): Promise { diff --git a/packages/node-core/src/indexer/storeCache/baseCache.service.ts b/packages/node-core/src/indexer/storeCache/baseCache.service.ts index 172ce6e123..93ec58f088 100644 --- a/packages/node-core/src/indexer/storeCache/baseCache.service.ts +++ b/packages/node-core/src/indexer/storeCache/baseCache.service.ts @@ -51,7 +51,7 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown { abstract get flushableRecords(): number; async beforeApplicationShutdown(): Promise { - await timeout(this.flushCache(true), 60); + await timeout(this.flushCache(true), 60, 'Before shutdown flush cache timeout'); this.logger.info(`Force flush cache successful!`); } } diff --git a/packages/node-core/src/indexer/worker/worker.service.ts b/packages/node-core/src/indexer/worker/worker.service.ts index 230fcb462d..beb97d6f09 100644 --- a/packages/node-core/src/indexer/worker/worker.service.ts +++ b/packages/node-core/src/indexer/worker/worker.service.ts @@ -40,7 +40,7 @@ export abstract class BaseWorkerService< private projectUpgradeService: IProjectUpgradeService, nodeConfig: NodeConfig ) { - this.queue = new AutoQueue(undefined, nodeConfig.batchSize); + this.queue = new AutoQueue(undefined, nodeConfig.batchSize, nodeConfig.timeout); } async fetchBlock(height: number, extra: E): Promise { diff --git a/packages/node-core/src/utils/autoQueue.ts b/packages/node-core/src/utils/autoQueue.ts index e67fd6ef00..a20d7ff42e 100644 --- a/packages/node-core/src/utils/autoQueue.ts +++ b/packages/node-core/src/utils/autoQueue.ts @@ -114,9 +114,9 @@ export class AutoQueue implements IQueue { /** * @param {number} capacity - The size limit of the queue, if undefined there is no limit * @param {number} [concurrency=1] - The number of parallel tasks that can be processed at any one time. - * @param {number} [taskTimeoutSec=60] - A timeout for tasks to complete in. Units are seconds. + * @param {number} [taskTimeoutSec=900] - A timeout for tasks to complete in. Units are seconds. Align with nodeConfig process timeout. * */ - constructor(capacity?: number, public concurrency = 1, private taskTimeoutSec = 60) { + constructor(capacity?: number, public concurrency = 1, private taskTimeoutSec = 900) { this.queue = new Queue>(capacity); } @@ -191,7 +191,11 @@ export class AutoQueue implements IQueue { this.pendingPromise = true; - const p = timeout(Promise.resolve(action.task()), this.taskTimeoutSec) + const p = timeout( + Promise.resolve(action.task()), + this.taskTimeoutSec, + `Auto queue process task timeout in ${this.taskTimeoutSec} seconds. Please increase --timeout` + ) .then((result) => { this.outOfOrderTasks[action.index] = {action, result}; }) diff --git a/packages/node-core/src/utils/promise.ts b/packages/node-core/src/utils/promise.ts index 0b7794f22e..baf9afe2a3 100644 --- a/packages/node-core/src/utils/promise.ts +++ b/packages/node-core/src/utils/promise.ts @@ -8,9 +8,9 @@ export async function delay(sec: number): Promise { }); } -export async function timeout(promise: Promise, sec: number): Promise { +export async function timeout(promise: Promise, sec: number, errMsg = 'timeout'): Promise { // so we can have a more comprehensive error stack - const err = new Error('timeout'); + const err = new Error(errMsg); let timeout: NodeJS.Timeout; return Promise.race([ promise.then( diff --git a/packages/node-core/src/utils/sync-helper.ts b/packages/node-core/src/utils/sync-helper.ts index c726bdc334..9e6c69a217 100644 --- a/packages/node-core/src/utils/sync-helper.ts +++ b/packages/node-core/src/utils/sync-helper.ts @@ -200,3 +200,30 @@ export function enumNameToHash(enumName: string): string { export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; } + +// SQL improvement +const DEFAULT_SQL_EXE_BATCH = 2000; + +/** + * Improve SQL which could potentially increase DB IO significantly, + * this executes it by batch size, and in ASC id order + **/ +export const sqlIterator = (tableName: string, sql: string, batch: number = DEFAULT_SQL_EXE_BATCH) => { + return ` + DO $$ + DECLARE + start_id INT; + end_id INT; + batch_size INT := ${batch}; + current_id INT; + BEGIN + SELECT MIN(id), MAX(id) INTO start_id, end_id FROM ${tableName}; + + IF start_id IS NOT NULL AND end_id IS NOT NULL THEN + FOR current_id IN start_id..end_id BY batch_size LOOP + ${sql}; + END LOOP; + END IF; + END $$ + `; +}; From 75854d1d8c407353c6d29b0724e158b04d97658f Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 10 Oct 2023 12:13:28 +1300 Subject: [PATCH 2/4] tidy up --- packages/node-core/src/indexer/poi/poi.service.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/node-core/src/indexer/poi/poi.service.ts b/packages/node-core/src/indexer/poi/poi.service.ts index 3656584d93..2935c89ead 100644 --- a/packages/node-core/src/indexer/poi/poi.service.ts +++ b/packages/node-core/src/indexer/poi/poi.service.ts @@ -186,9 +186,6 @@ export class PoiService implements OnApplicationShutdown { // Before migration `latestSyncedPoiHeight` haven't been record in Db meta // we try to find the first height from current poi table. and set for once const genesisPoi = await this.poiRepo.getFirst(); - // if (genesisPoi && (genesisPoi.hash === null || genesisPoi.parentHash === null)) { - // this.createGenesisPoi(genesisPoi); - // } if (genesisPoi) { this.createGenesisPoi(genesisPoi); } From f29973bc4f246f9d1b91373ffbb7b46aa9217d90 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 10 Oct 2023 12:17:57 +1300 Subject: [PATCH 3/4] changelog --- packages/node-core/CHANGELOG.md | 4 ++++ packages/node/CHANGELOG.md | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 9ccbde984d..0211521ba1 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- Fixed Poi migration performance issue with `sqlIterator` +- Fixed AutoQueue timeout issue, align setting with nodeConfig. + ## [5.0.3] - 2023-10-03 ### Fixed - Fix reindex service without poi feature (2062) diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index ee039269da..d0e05ebfc5 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- Sync with node-core. + - Fixed Poi migration performance issue. + - Fixed AutoQueue timeout issue. + ## [3.0.4] - 2023-10-03 ### Changed - Version bump with `types-core` 0.1.1 From 5854c89312aee19d89555e82806e12241d8827c4 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 10 Oct 2023 12:20:19 +1300 Subject: [PATCH 4/4] tidy up --- packages/node-core/src/indexer/poi/poi.service.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/node-core/src/indexer/poi/poi.service.ts b/packages/node-core/src/indexer/poi/poi.service.ts index 2935c89ead..fd3f90a9f1 100644 --- a/packages/node-core/src/indexer/poi/poi.service.ts +++ b/packages/node-core/src/indexer/poi/poi.service.ts @@ -316,11 +316,9 @@ export class PoiService implements OnApplicationShutdown { this.setLatestSyncedPoi(syncedPoiBlock); } if (appendedBlocks.length) { - syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); - - // if (this.nodeConfig.debug) { - // syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); - // } + if (this.nodeConfig.debug) { + syncingMsg(appendedBlocks[0].id, appendedBlocks[appendedBlocks.length - 1].id, appendedBlocks.length); + } this.poiRepo?.bulkUpsert(appendedBlocks); } }