Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto queue timeout, also poi migrate #2081

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Fixed Poi migration performance issue with `sqlIterator`
- Fixed AutoQueue timeout issue, align setting with nodeConfig.

## [5.0.3] - 2023-10-03
### Fixed
- Fix reindex service without poi feature (2062)
Expand Down
12 changes: 10 additions & 2 deletions packages/node-core/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,12 @@ export class DictionaryService {
query: gql(query),
variables,
}),
this.nodeConfig.dictionaryTimeout
this.nodeConfig.dictionaryTimeout,
`Dictionary query timeout in ${
this.nodeConfig.dictionaryTimeout
} seconds. Please increase --dictionary-timeout. ${
this.nodeConfig.debug ? `\n GraphQL: ${query}, \n Variables: ${variables}` : ''
}`
);
const blockHeightSet = new Set<number>();
const entityEndBlock: {[entity: string]: number} = {};
Expand Down Expand Up @@ -386,7 +391,10 @@ export class DictionaryService {
this.client.query({
query: gql(query),
}),
this.nodeConfig.dictionaryTimeout
this.nodeConfig.dictionaryTimeout,
`Dictionary metadata query timeout in ${
this.nodeConfig.dictionaryTimeout
} seconds. Please increase --dictionary-timeout. ${this.nodeConfig.debug ? `\n GraphQL: ${query}` : ''}`
);
const _metadata = resp.data._metadata;

Expand Down
27 changes: 20 additions & 7 deletions packages/node-core/src/indexer/poi/poi.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {Op, QueryTypes, Transaction} from '@subql/x-sequelize';
import {NodeConfig} from '../../configure';
import {PoiEvent} from '../../events';
import {getLogger} from '../../logger';
import {sqlIterator} from '../../utils';
import {ProofOfIndex, SyncedProofOfIndex} from '../entities/Poi.entity';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';
Expand Down Expand Up @@ -134,34 +135,46 @@ export class PoiService implements OnApplicationShutdown {
queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "chainBlockHash" DROP NOT NULL;`);
// keep existing chainBlockHash
queries.push(
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_chainBlockHash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL`
sqlIterator(
tableName,
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_chainBlockHash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL`
)
);
}
if (!checkResult.hash_nullable) {
queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "hash" DROP NOT NULL;`);
queries.push(`UPDATE ${tableName} SET hash = NULL;`);
queries.push(
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_hash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL`
sqlIterator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does iterators make sense with indexes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would create index for any existing records. iterators make it much efficient and smooth. We did same for change index in dictionary project on host service.

tableName,
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_hash" ON ${tableName} ("hash") WHERE "hash" IS NOT NULL`
)
);
}
if (!checkResult.parent_nullable) {
queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "parentHash" DROP NOT NULL;`);
queries.push(`UPDATE ${tableName} SET "parentHash" = NULL;`);
queries.push(
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_parent_hash" ON ${tableName} ("parentHash") WHERE "parentHash" IS NOT NULL`
sqlIterator(
tableName,
`CREATE UNIQUE INDEX IF NOT EXISTS "poi_parent_hash" ON ${tableName} ("parentHash") WHERE "parentHash" IS NOT NULL`
)
);
}
}

if (queries.length) {
const tx = await this.poiRepo.model.sequelize?.transaction();
if (!tx) {
throw new Error(`Create transaction for poi migration got undefined!`);
}
for (const query of queries) {
try {
await this.poiRepo?.model.sequelize?.query(query, {type: QueryTypes.SELECT});
await this.poiRepo?.model.sequelize?.query(query, {type: QueryTypes.SELECT, transaction: tx});
} catch (e) {
logger.error(`Migration poi failed with query: ${query}`);
throw e;
}
}
await tx.commit();
logger.info(`Successful migrate Poi`);
if (checkResult?.mmr_exists) {
logger.info(`If file based mmr were used previously, it can be clean up mannually`);
Expand All @@ -173,7 +186,7 @@ export class PoiService implements OnApplicationShutdown {
// Before migration `latestSyncedPoiHeight` haven't been record in Db meta
// we try to find the first height from current poi table. and set for once
const genesisPoi = await this.poiRepo.getFirst();
if (genesisPoi && (genesisPoi.hash === null || genesisPoi.parentHash === null)) {
if (genesisPoi) {
this.createGenesisPoi(genesisPoi);
}

Expand Down
6 changes: 5 additions & 1 deletion packages/node-core/src/indexer/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ export class Sandbox extends NodeVM {
}

async runTimeout<T = unknown>(duration: number): Promise<T> {
return timeout(this.run(this.script), duration);
return timeout(
this.run(this.script),
duration,
`Sandbox execution timeout in ${duration} seconds. Please increase --timeout`
);
}

protected async convertStack(stackTrace: string | undefined): Promise<string | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown {
abstract get flushableRecords(): number;

async beforeApplicationShutdown(): Promise<void> {
await timeout(this.flushCache(true), 60);
await timeout(this.flushCache(true), 60, 'Before shutdown flush cache timeout');
this.logger.info(`Force flush cache successful!`);
}
}
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export abstract class BaseWorkerService<
private projectUpgradeService: IProjectUpgradeService,
nodeConfig: NodeConfig
) {
this.queue = new AutoQueue(undefined, nodeConfig.batchSize);
this.queue = new AutoQueue(undefined, nodeConfig.batchSize, nodeConfig.timeout);
}

async fetchBlock(height: number, extra: E): Promise<R | undefined> {
Expand Down
10 changes: 7 additions & 3 deletions packages/node-core/src/utils/autoQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ export class AutoQueue<T> implements IQueue {
/**
* @param {number} capacity - The size limit of the queue, if undefined there is no limit
* @param {number} [concurrency=1] - The number of parallel tasks that can be processed at any one time.
* @param {number} [taskTimeoutSec=60] - A timeout for tasks to complete in. Units are seconds.
* @param {number} [taskTimeoutSec=900] - A timeout for tasks to complete in. Units are seconds. Align with nodeConfig process timeout.
* */
constructor(capacity?: number, public concurrency = 1, private taskTimeoutSec = 60) {
constructor(capacity?: number, public concurrency = 1, private taskTimeoutSec = 900) {
this.queue = new Queue<Action<T>>(capacity);
}

Expand Down Expand Up @@ -191,7 +191,11 @@ export class AutoQueue<T> implements IQueue {

this.pendingPromise = true;

const p = timeout(Promise.resolve(action.task()), this.taskTimeoutSec)
const p = timeout(
Promise.resolve(action.task()),
this.taskTimeoutSec,
`Auto queue process task timeout in ${this.taskTimeoutSec} seconds. Please increase --timeout`
)
.then((result) => {
this.outOfOrderTasks[action.index] = {action, result};
})
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/utils/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ export async function delay(sec: number): Promise<void> {
});
}

export async function timeout<T>(promise: Promise<T>, sec: number): Promise<T> {
export async function timeout<T>(promise: Promise<T>, sec: number, errMsg = 'timeout'): Promise<T> {
// so we can have a more comprehensive error stack
const err = new Error('timeout');
const err = new Error(errMsg);
let timeout: NodeJS.Timeout;
return Promise.race([
promise.then(
Expand Down
27 changes: 27 additions & 0 deletions packages/node-core/src/utils/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,30 @@ export function enumNameToHash(enumName: string): string {
export function getExistedIndexesQuery(schema: string): string {
return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`;
}

// SQL improvement
const DEFAULT_SQL_EXE_BATCH = 2000;

/**
* Improve SQL which could potentially increase DB IO significantly,
* this executes it by batch size, and in ASC id order
**/
export const sqlIterator = (tableName: string, sql: string, batch: number = DEFAULT_SQL_EXE_BATCH) => {
return `
DO $$
DECLARE
start_id INT;
end_id INT;
batch_size INT := ${batch};
current_id INT;
BEGIN
SELECT MIN(id), MAX(id) INTO start_id, end_id FROM ${tableName};

IF start_id IS NOT NULL AND end_id IS NOT NULL THEN
FOR current_id IN start_id..end_id BY batch_size LOOP
${sql};
END LOOP;
END IF;
END $$
`;
};
5 changes: 5 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Sync with node-core.
- Fixed Poi migration performance issue.
- Fixed AutoQueue timeout issue.

## [3.0.4] - 2023-10-03
### Changed
- Version bump with `types-core` 0.1.1
Expand Down
Loading