Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reindex query #2131

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Fixed
- Improve reindex query to remove/update by batches. (#2131)
- Fix poi reindex beyond genesis height issue. (#2131)
- Fixed modulo block ahead of finalized block issue (#2132)

- Wrong link to docs for testing
### Added
- WorkerInMemoryCacheService from node (#2125)
- New `endBlock` option on datasources (#2064)

### Fixed
- Wrong link to docs for testing

## [6.1.1] - 2023-10-25
### Fixed
- Not handling TaskFlushedErrors in block dispatchers (#2120)
Expand Down
68 changes: 58 additions & 10 deletions packages/node-core/src/indexer/poi/poi.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {Op, QueryTypes, Transaction} from '@subql/x-sequelize';
import {NodeConfig} from '../../configure';
import {getLogger} from '../../logger';
import {sqlIterator} from '../../utils';
import {PoiRepo} from '../entities';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';
import {ISubqueryProject} from '../types';
Expand Down Expand Up @@ -114,7 +115,7 @@ export class PoiService implements OnApplicationShutdown {
}
if (!checkResult.parent_nullable) {
queries.push(`ALTER TABLE ${tableName} ALTER COLUMN "parentHash" DROP NOT NULL;`);
queries.push(sqlIterator(tableName, `UPDATE ${tableName} SET "parentHash" = NULL;`));
queries.push(sqlIterator(tableName, `UPDATE ${tableName} SET "parentHash" = NULL`));

queries.push(
sqlIterator(
Expand Down Expand Up @@ -151,18 +152,65 @@ export class PoiService implements OnApplicationShutdown {
}

async rewind(targetBlockHeight: number, transaction: Transaction): Promise<void> {
await this.poiRepo.model.destroy({
transaction,
where: {
id: {
[Op.gt]: targetBlockHeight,
},
},
});
await batchDeletePoi(this.poiRepo.model, transaction, targetBlockHeight);
const lastSyncedPoiHeight = await this.storeCache.metadata.find('latestSyncedPoiHeight');

if (lastSyncedPoiHeight !== undefined && lastSyncedPoiHeight > targetBlockHeight) {
this.storeCache.metadata.set('latestSyncedPoiHeight', targetBlockHeight);
const genesisPoi = await this.poiRepo.model.findOne({
order: [['id', 'ASC']],
transaction: transaction,
});
// This indicates reindex height is less than genesis poi height
// And genesis poi has been remove from `batchDeletePoi`
if (!genesisPoi) {
this.storeCache.metadata.bulkRemove(['latestSyncedPoiHeight']);
} else {
this.storeCache.metadata.set('latestSyncedPoiHeight', targetBlockHeight);
}
}
this.storeCache.metadata.bulkRemove(['lastCreatedPoiHeight']);
}
}

// REMOVE 10,000 record per batch
async function batchDeletePoi(
model: PoiRepo,
transaction: Transaction,
targetBlockHeight: number,
batchSize = 10000
): Promise<void> {
let completed = false;
// eslint-disable-next-line no-constant-condition
while (!completed) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use sqlIterator there? It seems expensive to load before removing.

This maybe useful. Static destroy returns the number of items destroyed https://sequelize.org/api/v6/class/src/model.js~model#static-method-destroy

I see the same is happening with other data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlIterator apply sql to whole table, it is useful for sql like create index xxx for all records.

In batchDeletePoi, we only need to remove records with certain block ranges. Compare to sqlIteraor, even we add where conditions to excute the block range in sql, it will still iterate the complete table. I think this is the better approach.

try {
const recordsToDelete = await model.findAll({
transaction,
limit: batchSize,
where: {
id: {
[Op.gt]: targetBlockHeight,
},
},
});
if (recordsToDelete.length === 0) {
break;
completed = true;
}
logger.debug(`Found Poi recordsToDelete ${recordsToDelete.length}`);
if (recordsToDelete.length) {
await model.destroy({
transaction,
hooks: false,
where: {
id: {
[Op.in]: recordsToDelete.map((record) => record.id),
},
},
});
}
// offset += batchSize;
} catch (e) {
throw new Error(`Reindex model Poi failed, please try to reindex again: ${e}`);
}
}
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/poi/poiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class PlainPoiModel implements PoiInterface {
}
await this.model.bulkCreate(proofs, {
transaction: tx,
conflictAttributes: ['id'],
updateOnDuplicate: ['hash', 'parentHash'],
});
}
Expand Down
106 changes: 79 additions & 27 deletions packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,35 +714,14 @@ group by
if (!this.historical) {
throw new Error('Unable to reindex, historical state not enabled');
}
// This should only been called from CLI, blockHeight in storeService never been set and is required for`beforeFind` hook
// Height no need to change for rewind during indexing
if (this._blockHeight === undefined) {
this.setBlockHeight(targetBlockHeight);
}
for (const model of Object.values(this.sequelize.models)) {
if ('__block_range' in model.getAttributes()) {
await model.destroy({
transaction,
hooks: false,
where: this.sequelize.where(
this.sequelize.fn('lower', this.sequelize.col('_block_range')),
Op.gt,
targetBlockHeight
),
});
await model.update(
{
__block_range: this.sequelize.fn(
'int8range',
this.sequelize.fn('lower', this.sequelize.col('_block_range')),
null
),
},
{
transaction,
hooks: false,
where: {
__block_range: {
[Op.contains]: targetBlockHeight,
},
},
}
);
await batchDeleteAndThenUpdate(this.sequelize, model, transaction, targetBlockHeight);
}
}
this.metadataModel.set('lastProcessedHeight', targetBlockHeight);
Expand Down Expand Up @@ -907,3 +886,76 @@ group by
};
}
}

// REMOVE 10,000 record per batch
async function batchDeleteAndThenUpdate(
sequelize: Sequelize,
model: ModelStatic<any>,
transaction: Transaction,
targetBlockHeight: number,
batchSize = 10000
): Promise<void> {
let offset = 0;
let completed = false;
// eslint-disable-next-line no-constant-condition
while (!completed) {
try {
const [recordsToUpdate, recordsToDelete] = await Promise.all([
model.findAll({
transaction,
limit: batchSize,
attributes: {include: ['_id']},
offset, // We need to apply offset, because after update the records, the record could still with in range, avoid endless query here.
where: {
__block_range: {
[Op.contains]: targetBlockHeight,
},
},
}),
model.findAll({
transaction,
limit: batchSize,
attributes: {include: ['_id']},
where: sequelize.where(sequelize.fn('lower', sequelize.col('_block_range')), Op.gt, targetBlockHeight),
}),
]);
if (recordsToDelete.length === 0 && recordsToUpdate.length === 0) {
break;
completed = true;
}
logger.debug(
`Found ${model.name} recordsToDelete ${recordsToDelete.length},recordsToUpdate ${recordsToUpdate.length}`
);
if (recordsToDelete.length) {
await model.destroy({
transaction,
hooks: false,
where: {
_id: {
[Op.in]: recordsToDelete.map((record) => record.dataValues._id),
},
},
});
}
if (recordsToUpdate.length) {
await model.update(
{
__block_range: sequelize.fn('int8range', sequelize.fn('lower', sequelize.col('_block_range')), null),
},
{
transaction,
hooks: false,
where: {
_id: {
[Op.in]: recordsToUpdate.map((record) => record.dataValues._id),
},
},
}
);
}
offset += batchSize;
} catch (e) {
throw new Error(`Reindex update model ${model.name} failed, please try to reindex again: ${e}`);
}
}
}
Loading