From 374968c177e76ccb05d33f2044991800aba986a2 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Sun, 10 Mar 2024 10:05:58 +0200 Subject: [PATCH 1/5] feat: add script migrations --- packages/worker/.env.example | 4 + packages/worker/src/app.module.ts | 4 + packages/worker/src/app.service.ts | 8 ++ packages/worker/src/config.spec.ts | 3 + packages/worker/src/config.ts | 4 + packages/worker/src/entities/index.ts | 1 + .../src/entities/scriptMigration.entity.ts | 35 ++++++ .../1710057320666-AddScriptMigrations.ts | 25 +++++ packages/worker/src/repositories/index.ts | 1 + .../scriptMigration.repository.ts | 16 +++ .../1710057321666-AddAddressTransferType.ts | 101 ++++++++++++++++++ .../worker/src/utils/runScriptMigrations.ts | 88 +++++++++++++++ 12 files changed, 290 insertions(+) create mode 100644 packages/worker/src/entities/scriptMigration.entity.ts create mode 100644 packages/worker/src/migrations/1710057320666-AddScriptMigrations.ts create mode 100644 packages/worker/src/repositories/scriptMigration.repository.ts create mode 100644 packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts create mode 100644 packages/worker/src/utils/runScriptMigrations.ts diff --git a/packages/worker/.env.example b/packages/worker/.env.example index d06ef87b32..dedd7dbbbd 100644 --- a/packages/worker/.env.example +++ b/packages/worker/.env.example @@ -8,6 +8,10 @@ DATABASE_NAME=block-explorer DATABASE_CONNECTION_IDLE_TIMEOUT_MS=12000 DATABASE_CONNECTION_POOL_SIZE=100 +# Used to execute expensive script migrations to migrate data without downtime +# Do not enable it by default, script migrations are specific to zkSync hosted instanced and have to be executed in certain order and controlled manually +ENABLE_SCRIPT_MIGRATIONS=false + BLOCKCHAIN_RPC_URL=http://localhost:3050 DATA_FETCHER_URL=http://localhost:3040 DATA_FETCHER_REQUEST_TIMEOUT=120000 diff --git a/packages/worker/src/app.module.ts b/packages/worker/src/app.module.ts index bec09a377a..b557fef450 100644 --- a/packages/worker/src/app.module.ts +++ b/packages/worker/src/app.module.ts @@ -31,6 +31,7 @@ import { AddressTransferRepository, LogRepository, BalanceRepository, + ScriptMigrationRepository, } from "./repositories"; import { Batch, @@ -44,6 +45,7 @@ import { Transfer, AddressTransfer, Balance, + ScriptMigration, } from "./entities"; import { typeOrmModuleOptions } from "./typeorm.config"; import { JsonRpcProviderModule } from "./rpcProvider/jsonRpcProvider.module"; @@ -80,6 +82,7 @@ import { DataFetcherService } from "./dataFetcher/dataFetcher.service"; AddressTransfer, Transfer, Balance, + ScriptMigration, ]), EventEmitterModule.forRoot(), JsonRpcProviderModule.forRoot(), @@ -121,6 +124,7 @@ import { DataFetcherService } from "./dataFetcher/dataFetcher.service"; AddressTransferRepository, BalanceRepository, LogRepository, + ScriptMigrationRepository, BlocksRevertService, BatchService, BlockProcessor, diff --git a/packages/worker/src/app.service.ts b/packages/worker/src/app.service.ts index d6ec852c73..66b57322ad 100644 --- a/packages/worker/src/app.service.ts +++ b/packages/worker/src/app.service.ts @@ -3,6 +3,7 @@ import { ConfigService } from "@nestjs/config"; import { OnEvent } from "@nestjs/event-emitter"; import { DataSource } from "typeorm"; import { BLOCKS_REVERT_DETECTED_EVENT } from "./constants"; +import { ScriptMigrationRepository } from "./repositories/scriptMigration.repository"; import { BlocksRevertService } from "./blocksRevert"; import { BlockService } from "./block"; import { BatchService } from "./batch"; @@ -10,12 +11,14 @@ import { CounterService } from "./counter"; import { BalancesCleanerService } from "./balance"; import { TokenOffChainDataSaverService } from "./token/tokenOffChainData/tokenOffChainDataSaver.service"; import runMigrations from "./utils/runMigrations"; +import runScriptMigrations from "./utils/runScriptMigrations"; @Injectable() export class AppService implements OnModuleInit, OnModuleDestroy { private readonly logger: Logger; public constructor( + private readonly scriptMigrationRepository: ScriptMigrationRepository, private readonly counterService: CounterService, private readonly batchService: BatchService, private readonly blockService: BlockService, @@ -30,6 +33,11 @@ export class AppService implements OnModuleInit, OnModuleDestroy { public onModuleInit() { runMigrations(this.dataSource, this.logger).then(() => { + const enableScriptMigrations = this.configService.get("scriptMigrations.enabled"); + if (enableScriptMigrations) { + // Run script migrations on background if there are any to run. + runScriptMigrations(this.scriptMigrationRepository, this.dataSource, this.logger); + } this.startWorkers(); }); } diff --git a/packages/worker/src/config.spec.ts b/packages/worker/src/config.spec.ts index f85c3f52d8..47183f2661 100644 --- a/packages/worker/src/config.spec.ts +++ b/packages/worker/src/config.spec.ts @@ -59,6 +59,9 @@ describe("config", () => { collectDbConnectionPoolMetricsInterval: 10000, collectBlocksToProcessMetricInterval: 10000, }, + scriptMigrations: { + enabled: false, + }, }); }); }); diff --git a/packages/worker/src/config.ts b/packages/worker/src/config.ts index b2884f949a..147ad505c7 100644 --- a/packages/worker/src/config.ts +++ b/packages/worker/src/config.ts @@ -21,6 +21,7 @@ export default () => { DISABLE_COUNTERS_PROCESSING, DISABLE_OLD_BALANCES_CLEANER, DISABLE_BLOCKS_REVERT, + ENABLE_SCRIPT_MIGRATIONS, ENABLE_TOKEN_OFFCHAIN_DATA_SAVER, UPDATE_TOKEN_OFFCHAIN_DATA_INTERVAL, SELECTED_TOKEN_OFFCHAIN_DATA_PROVIDER, @@ -78,5 +79,8 @@ export default () => { collectDbConnectionPoolMetricsInterval: parseInt(COLLECT_DB_CONNECTION_POOL_METRICS_INTERVAL, 10) || 10000, collectBlocksToProcessMetricInterval: parseInt(COLLECT_BLOCKS_TO_PROCESS_METRIC_INTERVAL, 10) || 10000, }, + scriptMigrations: { + enabled: ENABLE_SCRIPT_MIGRATIONS === "true", + }, }; }; diff --git a/packages/worker/src/entities/index.ts b/packages/worker/src/entities/index.ts index 0279626960..e952755e70 100644 --- a/packages/worker/src/entities/index.ts +++ b/packages/worker/src/entities/index.ts @@ -13,3 +13,4 @@ export * from "./addressTransfer.entity"; export * from "./balance.entity"; export * from "./counter.entity"; export * from "./counterState.entity"; +export * from "./scriptMigration.entity"; diff --git a/packages/worker/src/entities/scriptMigration.entity.ts b/packages/worker/src/entities/scriptMigration.entity.ts new file mode 100644 index 0000000000..e0fd67071c --- /dev/null +++ b/packages/worker/src/entities/scriptMigration.entity.ts @@ -0,0 +1,35 @@ +import { Entity, Column, PrimaryColumn, Index } from "typeorm"; +import { BaseEntity } from "./base.entity"; + +export interface ScriptMigrationParams { + [key: string]: string; +} + +export enum ScriptMigrationStatus { + NotStarted = "not_started", + Pending = "pending", + Failed = "failed", + Completed = "completed", + Outdated = "outdated", +} + +@Entity({ name: "scriptMigrations" }) +export class ScriptMigration extends BaseEntity { + @PrimaryColumn({ generated: true, type: "bigint" }) + public readonly number: number; + + @Index({ unique: true }) + @Column({ type: "varchar" }) + public readonly name: string; + + @Index() + @Column({ type: "bigint" }) + public readonly timestamp: number; + + @Index() + @Column({ type: "enum", enum: ScriptMigrationStatus, default: ScriptMigrationStatus.NotStarted }) + public readonly status: ScriptMigrationStatus; + + @Column({ type: "jsonb", nullable: true }) + public readonly params?: ScriptMigrationParams; +} diff --git a/packages/worker/src/migrations/1710057320666-AddScriptMigrations.ts b/packages/worker/src/migrations/1710057320666-AddScriptMigrations.ts new file mode 100644 index 0000000000..a1a5b61278 --- /dev/null +++ b/packages/worker/src/migrations/1710057320666-AddScriptMigrations.ts @@ -0,0 +1,25 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class AddScriptMigrations1710057320666 implements MigrationInterface { + name = "AddScriptMigrations1710057320666"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TYPE "public"."scriptMigrations_status_enum" AS ENUM('not_started', 'pending', 'failed', 'completed', 'outdated')` + ); + await queryRunner.query( + `CREATE TABLE "scriptMigrations" ("createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), "number" BIGSERIAL NOT NULL, "name" character varying NOT NULL, "timestamp" bigint NOT NULL, "status" "public"."scriptMigrations_status_enum" NOT NULL DEFAULT 'not_started', "params" jsonb, CONSTRAINT "PK_def0d458be005c5e9640f69dde8" PRIMARY KEY ("number"))` + ); + await queryRunner.query(`CREATE UNIQUE INDEX "IDX_42f873351064e516bd8dcbec67" ON "scriptMigrations" ("name") `); + await queryRunner.query(`CREATE INDEX "IDX_7dabb5b44d259202dabf3d1621" ON "scriptMigrations" ("timestamp") `); + await queryRunner.query(`CREATE INDEX "IDX_f13cd15275d590b0af3e466907" ON "scriptMigrations" ("status") `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_f13cd15275d590b0af3e466907"`); + await queryRunner.query(`DROP INDEX "public"."IDX_7dabb5b44d259202dabf3d1621"`); + await queryRunner.query(`DROP INDEX "public"."IDX_42f873351064e516bd8dcbec67"`); + await queryRunner.query(`DROP TABLE "scriptMigrations"`); + await queryRunner.query(`DROP TYPE "public"."scriptMigrations_status_enum"`); + } +} diff --git a/packages/worker/src/repositories/index.ts b/packages/worker/src/repositories/index.ts index 4cca0ea3e5..b971544b5f 100644 --- a/packages/worker/src/repositories/index.ts +++ b/packages/worker/src/repositories/index.ts @@ -11,3 +11,4 @@ export * from "./log.repository"; export * from "./balance.repository"; export * from "./counter.repository"; export * from "./counterState.repository"; +export * from "./scriptMigration.repository"; diff --git a/packages/worker/src/repositories/scriptMigration.repository.ts b/packages/worker/src/repositories/scriptMigration.repository.ts new file mode 100644 index 0000000000..7cd15f0ed6 --- /dev/null +++ b/packages/worker/src/repositories/scriptMigration.repository.ts @@ -0,0 +1,16 @@ +import { Injectable } from "@nestjs/common"; +import { ScriptMigration } from "../entities"; +import { UnitOfWork } from "../unitOfWork"; +import { BaseRepository } from "./base.repository"; + +@Injectable() +export class ScriptMigrationRepository extends BaseRepository { + public constructor(unitOfWork: UnitOfWork) { + super(ScriptMigration, unitOfWork); + } + + public async update(criteria: any, partialEntity: Partial) { + const transactionManager = this.unitOfWork.getTransactionManager(); + await transactionManager.update(ScriptMigration, criteria, partialEntity); + } +} diff --git a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts new file mode 100644 index 0000000000..be950a5464 --- /dev/null +++ b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts @@ -0,0 +1,101 @@ +import { DataSource } from "typeorm"; +import { Logger } from "@nestjs/common"; +import { setTimeout } from "timers/promises"; +import { ScriptMigrationRunner } from "../utils/runScriptMigrations"; +import { ScriptMigrationRepository } from "../repositories/scriptMigration.repository"; +import { ScriptMigrationStatus } from "../entities/scriptMigration.entity"; + +const QUERY_MAX_RETRIES = 5; +const QUERY_RETRY_MIN_INTERVAL_MS = 1000; + +class AddAddressTransferType1710057321666 implements ScriptMigrationRunner { + public readonly name = "AddAddressTransferType1710057321666"; + + public async run(scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) { + const migration = await scriptMigrationRepository.findOneBy({ + name: this.name, + }); + if (migration.status !== ScriptMigrationStatus.Pending) { + await scriptMigrationRepository.update({ number: migration.number }, { status: ScriptMigrationStatus.Pending }); + } + + const params = migration.params || {}; + const fromTransferNumber = Number(params.fromTransferNumber) || 0; + let toTransferNumber = Number(params.toTransferNumber) || 0; + const updateBatchSize = Number(params.updateBatchSize) || 4000; + const parallelWorkers = Number(params.parallelWorkers) || 50; + + if (!toTransferNumber) { + const lastTransferNumber = await dataSource.query( + `Select "number" from "transfers" order by "number" DESC limit 1;` + ); + toTransferNumber = parseInt(lastTransferNumber[0].number, 10); + } + logger.log( + `Starting migration ${this.name} with params: { fromTransferNumber: ${fromTransferNumber}, toTransferNumber: ${toTransferNumber}, updateBatchSize: ${updateBatchSize}, parallelWorkers: ${parallelWorkers} }` + ); + + let cursor = fromTransferNumber; + while (cursor <= toTransferNumber) { + const tasks = []; + for (let workerIndex = 0; workerIndex < parallelWorkers; workerIndex++) { + const batchStartNumber = cursor + workerIndex * updateBatchSize; + if (batchStartNumber > toTransferNumber) { + break; + } + let batchEndNumber = batchStartNumber + updateBatchSize; + if (batchEndNumber > toTransferNumber) { + batchEndNumber = toTransferNumber + 1; + } + tasks.push(this.updateAddressTransfers(dataSource, logger, batchStartNumber, batchEndNumber)); + } + await Promise.all(tasks); + + logger.log( + `Updated address transfers from ${cursor} to ${ + cursor + parallelWorkers * updateBatchSize + }. Time: ${new Date().toJSON()}.` + ); + await scriptMigrationRepository.update( + { number: migration.number }, + { + params: { + fromTransferNumber: (cursor + parallelWorkers * updateBatchSize).toString(), + toTransferNumber: toTransferNumber.toString(), + }, + } + ); + cursor = cursor + parallelWorkers * updateBatchSize; + } + } + + private async updateAddressTransfers( + dataSource: DataSource, + logger: Logger, + from: number, + to: number, + attempt = 0 + ): Promise { + try { + await dataSource.query( + `Update "addressTransfers" + Set "type" = "transfers".type::VARCHAR::"addressTransfers_type_enum" + From "transfers" + WHERE "transfers"."number" = "addressTransfers"."transferNumber" + AND "transfers"."number" >= ${from} + AND "transfers"."number" < ${to} + AND "transfers"."type" != 'transfer'` + ); + } catch (error) { + if (attempt >= QUERY_MAX_RETRIES) { + logger.error(`Failed to update AddressTransfers from ${from} to ${to} after ${QUERY_MAX_RETRIES} retries.`); + throw error; + } + await setTimeout(QUERY_RETRY_MIN_INTERVAL_MS * Math.pow(2, attempt)); + logger.error(`Failed to update AddressTransfers from ${from} to ${to}, retrying...`); + return this.updateAddressTransfers(dataSource, logger, from, to, attempt + 1); + } + } +} + +export default new AddAddressTransferType1710057321666(); diff --git a/packages/worker/src/utils/runScriptMigrations.ts b/packages/worker/src/utils/runScriptMigrations.ts new file mode 100644 index 0000000000..5f969a76c7 --- /dev/null +++ b/packages/worker/src/utils/runScriptMigrations.ts @@ -0,0 +1,88 @@ +import { DataSource, Not, Equal, And } from "typeorm"; +import { Logger } from "@nestjs/common"; +import { readdir } from "fs/promises"; +import * as path from "path"; +import { ScriptMigrationRepository } from "../repositories/scriptMigration.repository"; +import { ScriptMigrationStatus } from "../entities/scriptMigration.entity"; + +const GET_LATEST_MIGRATION_SQL = `SELECT * FROM migrations ORDER BY timestamp DESC limit 1`; + +export interface ScriptMigrationRunner { + name: string; + run: (scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) => Promise; +} + +export default async (scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) => { + const latestMigration = (await dataSource.query(GET_LATEST_MIGRATION_SQL))[0]; + + try { + let scriptMigrationFileNames = await readdir(path.join(__dirname, "../scriptMigrations"), "utf-8"); + scriptMigrationFileNames = scriptMigrationFileNames.sort((a, b) => (a > b ? 1 : -1)); + + for (const scriptMigrationFileName of scriptMigrationFileNames) { + if (!scriptMigrationFileName.endsWith(".ts")) { + continue; + } + const [timestampPart, namePart] = scriptMigrationFileName.replace(".ts", "").split("-"); + const scriptMigrationName = `${namePart}${timestampPart}`; + const scriptMigrationTimestamp = Number(timestampPart); + if (!scriptMigrationTimestamp) { + continue; + } + if (Number(latestMigration.timestamp) > scriptMigrationTimestamp) { + // skip script migration if there are already newer regular migrations executed as it might be outdated and impossible to execute + continue; + } + const existingScriptMigration = await scriptMigrationRepository.findOneBy({ + name: scriptMigrationName, + }); + if (!existingScriptMigration) { + await scriptMigrationRepository.add({ + name: scriptMigrationName, + timestamp: scriptMigrationTimestamp, + }); + } + } + } catch (error) { + logger.error({ + message: "Failed to add script migrations to the DB", + stack: error.stack, + }); + } + + try { + const scriptMigrationsToRun = await scriptMigrationRepository.find({ + where: { + status: And(Not(Equal(ScriptMigrationStatus.Completed)), Not(Equal(ScriptMigrationStatus.Outdated))), + }, + order: { + timestamp: "ASC", + }, + }); + if (!scriptMigrationsToRun.length) { + return; + } + for (const scriptMigration of scriptMigrationsToRun) { + if (Number(latestMigration.timestamp) > scriptMigration.timestamp) { + // skip script migration if there are already newer regular migrations executed as it might be outdated and impossible to execute + await scriptMigrationRepository.update( + { number: scriptMigration.number }, + { status: ScriptMigrationStatus.Outdated } + ); + continue; + } + const migrationRunner: ScriptMigrationRunner = await import(`../scriptMigrations/${scriptMigration.name}`); + + logger.log(`Starting script migration ${scriptMigration.name}`); + try { + await migrationRunner.run(scriptMigrationRepository, dataSource, logger); + logger.log(`Script migration ${scriptMigration.name} completed`); + } catch (error) { + logger.error(`Script migration ${scriptMigration.name} failed`); + throw error; + } + } + } catch (error) { + logger.error("Failed to execute script migrations"); + } +}; From 778cd83ef69f9522397288aa39a50a6f19266bb6 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Tue, 12 Mar 2024 13:20:21 +0200 Subject: [PATCH 2/5] fix: update script migration progress --- .../1710057321666-AddAddressTransferType.ts | 11 +- .../worker/src/utils/runScriptMigrations.ts | 113 +++++++++++------- 2 files changed, 78 insertions(+), 46 deletions(-) diff --git a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts index be950a5464..537db01927 100644 --- a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts +++ b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts @@ -12,14 +12,11 @@ class AddAddressTransferType1710057321666 implements ScriptMigrationRunner { public readonly name = "AddAddressTransferType1710057321666"; public async run(scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) { - const migration = await scriptMigrationRepository.findOneBy({ + const scriptMigration = await scriptMigrationRepository.findOneBy({ name: this.name, }); - if (migration.status !== ScriptMigrationStatus.Pending) { - await scriptMigrationRepository.update({ number: migration.number }, { status: ScriptMigrationStatus.Pending }); - } - const params = migration.params || {}; + const params = scriptMigration.params || {}; const fromTransferNumber = Number(params.fromTransferNumber) || 0; let toTransferNumber = Number(params.toTransferNumber) || 0; const updateBatchSize = Number(params.updateBatchSize) || 4000; @@ -57,11 +54,13 @@ class AddAddressTransferType1710057321666 implements ScriptMigrationRunner { }. Time: ${new Date().toJSON()}.` ); await scriptMigrationRepository.update( - { number: migration.number }, + { number: scriptMigration.number }, { params: { fromTransferNumber: (cursor + parallelWorkers * updateBatchSize).toString(), toTransferNumber: toTransferNumber.toString(), + updateBatchSize: updateBatchSize.toString(), + parallelWorkers: parallelWorkers.toString(), }, } ); diff --git a/packages/worker/src/utils/runScriptMigrations.ts b/packages/worker/src/utils/runScriptMigrations.ts index 5f969a76c7..3d85ed98d8 100644 --- a/packages/worker/src/utils/runScriptMigrations.ts +++ b/packages/worker/src/utils/runScriptMigrations.ts @@ -14,42 +14,16 @@ export interface ScriptMigrationRunner { export default async (scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) => { const latestMigration = (await dataSource.query(GET_LATEST_MIGRATION_SQL))[0]; + await addScriptMigrationsToDB(scriptMigrationRepository, logger, Number(latestMigration?.timestamp || 0)); + await runScriptMigrations(scriptMigrationRepository, dataSource, logger, Number(latestMigration?.timestamp || 0)); +}; - try { - let scriptMigrationFileNames = await readdir(path.join(__dirname, "../scriptMigrations"), "utf-8"); - scriptMigrationFileNames = scriptMigrationFileNames.sort((a, b) => (a > b ? 1 : -1)); - - for (const scriptMigrationFileName of scriptMigrationFileNames) { - if (!scriptMigrationFileName.endsWith(".ts")) { - continue; - } - const [timestampPart, namePart] = scriptMigrationFileName.replace(".ts", "").split("-"); - const scriptMigrationName = `${namePart}${timestampPart}`; - const scriptMigrationTimestamp = Number(timestampPart); - if (!scriptMigrationTimestamp) { - continue; - } - if (Number(latestMigration.timestamp) > scriptMigrationTimestamp) { - // skip script migration if there are already newer regular migrations executed as it might be outdated and impossible to execute - continue; - } - const existingScriptMigration = await scriptMigrationRepository.findOneBy({ - name: scriptMigrationName, - }); - if (!existingScriptMigration) { - await scriptMigrationRepository.add({ - name: scriptMigrationName, - timestamp: scriptMigrationTimestamp, - }); - } - } - } catch (error) { - logger.error({ - message: "Failed to add script migrations to the DB", - stack: error.stack, - }); - } - +const runScriptMigrations = async ( + scriptMigrationRepository: ScriptMigrationRepository, + dataSource: DataSource, + logger: Logger, + latestDBMigrationTimestamp: number +) => { try { const scriptMigrationsToRun = await scriptMigrationRepository.find({ where: { @@ -59,30 +33,89 @@ export default async (scriptMigrationRepository: ScriptMigrationRepository, data timestamp: "ASC", }, }); + if (!scriptMigrationsToRun.length) { return; } + for (const scriptMigration of scriptMigrationsToRun) { - if (Number(latestMigration.timestamp) > scriptMigration.timestamp) { - // skip script migration if there are already newer regular migrations executed as it might be outdated and impossible to execute + if (latestDBMigrationTimestamp > scriptMigration.timestamp) { + // skip script migration if there are already newer regular DB migrations executed as it might be outdated and impossible to execute await scriptMigrationRepository.update( { number: scriptMigration.number }, { status: ScriptMigrationStatus.Outdated } ); continue; } - const migrationRunner: ScriptMigrationRunner = await import(`../scriptMigrations/${scriptMigration.name}`); - logger.log(`Starting script migration ${scriptMigration.name}`); + const migrationRunner: ScriptMigrationRunner = await import(`../scriptMigrations/${scriptMigration.name}`); try { + logger.log(`Starting script migration ${scriptMigration.name}`); + if (scriptMigration.status !== ScriptMigrationStatus.Pending) { + await scriptMigrationRepository.update( + { number: scriptMigration.number }, + { status: ScriptMigrationStatus.Pending } + ); + } await migrationRunner.run(scriptMigrationRepository, dataSource, logger); logger.log(`Script migration ${scriptMigration.name} completed`); + await scriptMigrationRepository.update( + { number: scriptMigration.number }, + { status: ScriptMigrationStatus.Completed } + ); } catch (error) { logger.error(`Script migration ${scriptMigration.name} failed`); + await scriptMigrationRepository.update( + { number: scriptMigration.number }, + { status: ScriptMigrationStatus.Failed } + ); throw error; } } } catch (error) { - logger.error("Failed to execute script migrations"); + logger.error({ + message: "Failed to execute script migrations", + stack: error.stack, + }); + } +}; + +const addScriptMigrationsToDB = async ( + scriptMigrationRepository: ScriptMigrationRepository, + logger: Logger, + latestDBMigrationTimestamp: number +) => { + try { + let scriptMigrationFileNames = await readdir(path.join(__dirname, "../scriptMigrations"), "utf-8"); + scriptMigrationFileNames = scriptMigrationFileNames + .filter((scriptMigrationFileName) => scriptMigrationFileName.endsWith(".ts")) + .sort((a, b) => (a > b ? 1 : -1)); + + for (const scriptMigrationFileName of scriptMigrationFileNames) { + const [timestampPart, namePart] = scriptMigrationFileName.replace(".ts", "").split("-"); + const scriptMigrationName = `${namePart}${timestampPart}`; + const scriptMigrationTimestamp = Number(timestampPart); + if (!scriptMigrationTimestamp) { + continue; + } + if (latestDBMigrationTimestamp > scriptMigrationTimestamp) { + // skip script migration if there are already newer regular DB migrations executed as it might be outdated and impossible to execute + continue; + } + const existingScriptMigration = await scriptMigrationRepository.findOneBy({ + name: scriptMigrationName, + }); + if (!existingScriptMigration) { + await scriptMigrationRepository.add({ + name: scriptMigrationName, + timestamp: scriptMigrationTimestamp, + }); + } + } + } catch (error) { + logger.error({ + message: "Failed to add script migrations to the DB", + stack: error.stack, + }); } }; From 47cdcd0543fa46874fca7c7d39a30612dedcd9e2 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Tue, 12 Mar 2024 13:53:08 +0200 Subject: [PATCH 3/5] fix: add update query to address transfer type migration --- packages/worker/.env.example | 4 ++-- .../migrations/1709722093204-AddAddressTransferType.ts | 8 +++++++- .../1710057321666-AddAddressTransferType.ts | 1 - 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/worker/.env.example b/packages/worker/.env.example index dedd7dbbbd..ccd7f293b7 100644 --- a/packages/worker/.env.example +++ b/packages/worker/.env.example @@ -8,8 +8,8 @@ DATABASE_NAME=block-explorer DATABASE_CONNECTION_IDLE_TIMEOUT_MS=12000 DATABASE_CONNECTION_POOL_SIZE=100 -# Used to execute expensive script migrations to migrate data without downtime -# Do not enable it by default, script migrations are specific to zkSync hosted instanced and have to be executed in certain order and controlled manually +# Used to execute expensive script migrations on background to migrate data without downtime. +# Script migrations are specific to hosted instances and have to be executed at certain timeframes and controlled manually. ENABLE_SCRIPT_MIGRATIONS=false BLOCKCHAIN_RPC_URL=http://localhost:3050 diff --git a/packages/worker/src/migrations/1709722093204-AddAddressTransferType.ts b/packages/worker/src/migrations/1709722093204-AddAddressTransferType.ts index bb5f79b330..0275a7ee9f 100644 --- a/packages/worker/src/migrations/1709722093204-AddAddressTransferType.ts +++ b/packages/worker/src/migrations/1709722093204-AddAddressTransferType.ts @@ -11,7 +11,13 @@ export class AddAddressTransferType1709722093204 implements MigrationInterface { `ALTER TABLE "addressTransfers" ADD "type" "public"."addressTransfers_type_enum" NOT NULL DEFAULT 'transfer'` ); await queryRunner.query( - `CREATE INDEX "IDX_aa5a147f1f6a4acde1a13de594" ON "addressTransfers" ("address", "type", "timestamp", "logIndex") ` + `CREATE INDEX "IDX_aa5a147f1f6a4acde1a13de594" ON "addressTransfers" ("address", "type", "timestamp", "logIndex" DESC) ` + ); + await queryRunner.query( + `UPDATE "addressTransfers" Set "type" = "transfers".type::VARCHAR::"addressTransfers_type_enum" + FROM "transfers" + WHERE "transfers"."number" = "addressTransfers"."transferNumber" + AND "transfers"."type" != 'transfer'` ); } diff --git a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts index 537db01927..407f852cd0 100644 --- a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts +++ b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts @@ -3,7 +3,6 @@ import { Logger } from "@nestjs/common"; import { setTimeout } from "timers/promises"; import { ScriptMigrationRunner } from "../utils/runScriptMigrations"; import { ScriptMigrationRepository } from "../repositories/scriptMigration.repository"; -import { ScriptMigrationStatus } from "../entities/scriptMigration.entity"; const QUERY_MAX_RETRIES = 5; const QUERY_RETRY_MIN_INTERVAL_MS = 1000; From 39dcebb26aed7e4dd1c47d7eecc6d838c8628bf9 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Tue, 12 Mar 2024 13:59:38 +0200 Subject: [PATCH 4/5] fix: remove old migrationScripts --- packages/worker/package.json | 5 +- .../1709722093204-AddAddressTransferType.ts | 106 ------------------ 2 files changed, 2 insertions(+), 109 deletions(-) delete mode 100644 packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts diff --git a/packages/worker/package.json b/packages/worker/package.json index 1b64487f5f..55fba16126 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -28,8 +28,7 @@ "migration:generate": "npm run typeorm migration:generate ./src/migrations/$npm_config_name -- -d ./src/typeorm.config.ts", "migration:create": "npm run typeorm migration:create ./src/migrations/$npm_config_name", "migration:run": "npm run typeorm migration:run -- -d ./src/typeorm.config.ts", - "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts", - "migration-script:run": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register ./src/migrationScripts/{migrationFileName}.ts --runInBand" + "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts" }, "dependencies": { "@nestjs/axios": "^3.0.0", @@ -109,7 +108,7 @@ "src/logger.ts", "src/typeorm.config.ts", "src/migrations", - "src/migrationScripts" + "src/scriptMigrations" ], "reporters": [ "default", diff --git a/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts b/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts deleted file mode 100644 index 9b1997be90..0000000000 --- a/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { config } from "dotenv"; -import { DataSource } from "typeorm"; -import yargs from "yargs"; -import { hideBin } from "yargs/helpers"; -import { setTimeout } from "timers/promises"; -import { typeOrmModuleOptions } from "../typeorm.config"; -import logger from "../logger"; - -config(); - -const QUERY_MAX_RETRIES = 5; -const QUERY_RETRY_MIN_INTERVAL_MS = 1000; - -// eslint-disable-next-line prefer-const -let { fromTransferNumber, toTransferNumber, updateBatchSize, parallelWorkers } = yargs(hideBin(process.argv)) - .options({ - fromTransferNumber: { - default: 0, - type: "number", - }, - toTransferNumber: { - default: 0, - type: "number", - }, - updateBatchSize: { - default: 4000, - type: "number", - }, - parallelWorkers: { - default: 50, - type: "number", - }, - }) - .parseSync(); - -const updateAddressTransfers = async (dataSource: DataSource, from: number, to: number, attempt = 0): Promise => { - try { - await dataSource.query( - `Update "addressTransfers" - Set "type" = "transfers".type::VARCHAR::"addressTransfers_type_enum" - From "transfers" - WHERE "transfers"."number" = "addressTransfers"."transferNumber" - AND "transfers"."number" >= ${from} - AND "transfers"."number" < ${to} - AND "transfers"."type" != 'transfer'` - ); - } catch (error) { - if (attempt >= QUERY_MAX_RETRIES) { - logger.error(`Failed to update AddressTransfers from ${from} to ${to} after ${QUERY_MAX_RETRIES} retries.`); - throw error; - } - await setTimeout(QUERY_RETRY_MIN_INTERVAL_MS * Math.pow(2, attempt)); - logger.error(`Failed to update AddressTransfers from ${from} to ${to}, retrying...`); - return updateAddressTransfers(dataSource, from, to, attempt + 1); - } -}; - -const main = async () => { - const typeOrmCliDataSource = new DataSource(typeOrmModuleOptions); - await typeOrmCliDataSource.initialize(); - - if (!toTransferNumber) { - const lastTransferNumber = await typeOrmCliDataSource.query( - `Select "number" from "transfers" order by "number" DESC limit 1;` - ); - toTransferNumber = parseInt(lastTransferNumber[0].number, 10); - } - logger.log( - `Starting migration with params: { fromTransferNumber: ${fromTransferNumber}, toTransferNumber: ${toTransferNumber}, updateBatchSize: ${updateBatchSize}, parallelWorkers: ${parallelWorkers} }` - ); - - let cursor = fromTransferNumber; - while (cursor <= toTransferNumber) { - const tasks = []; - for (let workerIndex = 0; workerIndex < parallelWorkers; workerIndex++) { - const batchStartNumber = cursor + workerIndex * updateBatchSize; - if (batchStartNumber > toTransferNumber) { - break; - } - let batchEndNumber = batchStartNumber + updateBatchSize; - if (batchEndNumber > toTransferNumber) { - batchEndNumber = toTransferNumber + 1; - } - tasks.push(updateAddressTransfers(typeOrmCliDataSource, batchStartNumber, batchEndNumber)); - } - await Promise.all(tasks); - - logger.log( - `Updated address transfers from ${cursor} to ${ - cursor + parallelWorkers * updateBatchSize - }. Time: ${new Date().toJSON()}.` - ); - cursor = cursor + parallelWorkers * updateBatchSize; - } -}; - -main() - .then(() => { - logger.log(`Migration script 1709722093204-AddAddressTransferType executed successfully.`); - process.exit(0); - }) - .catch((error) => { - logger.error(`Migration script 1709722093204-AddAddressTransferType failed.`); - logger.error(error); - process.exit(0); - }); From 68c753d0364cf22f40d88d5180f0128f59d1bf5b Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Tue, 12 Mar 2024 14:53:25 +0200 Subject: [PATCH 5/5] fix: use dataSource to query script migrations --- packages/worker/package.json | 3 +- .../worker/scripts/run-script-migrations.ts | 9 ++++ packages/worker/src/app.service.ts | 4 +- .../scriptMigration.repository.ts | 5 -- .../1710057321666-AddAddressTransferType.ts | 13 +++-- .../worker/src/utils/runScriptMigrations.ts | 48 +++++++++---------- 6 files changed, 42 insertions(+), 40 deletions(-) create mode 100644 packages/worker/scripts/run-script-migrations.ts diff --git a/packages/worker/package.json b/packages/worker/package.json index 55fba16126..2715d77ff3 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -28,7 +28,8 @@ "migration:generate": "npm run typeorm migration:generate ./src/migrations/$npm_config_name -- -d ./src/typeorm.config.ts", "migration:create": "npm run typeorm migration:create ./src/migrations/$npm_config_name", "migration:run": "npm run typeorm migration:run -- -d ./src/typeorm.config.ts", - "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts" + "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts", + "script-migrations:run": "ts-node ./scripts/run-script-migrations.ts" }, "dependencies": { "@nestjs/axios": "^3.0.0", diff --git a/packages/worker/scripts/run-script-migrations.ts b/packages/worker/scripts/run-script-migrations.ts new file mode 100644 index 0000000000..debd3c7132 --- /dev/null +++ b/packages/worker/scripts/run-script-migrations.ts @@ -0,0 +1,9 @@ +import { Logger } from "@nestjs/common"; +import typeOrmCliDataSource from "../src/typeorm.config"; +import runScriptMigrations from "../src/utils/runScriptMigrations"; + +(async () => { + const logger = new Logger("ScriptMigrations"); + await runScriptMigrations(typeOrmCliDataSource, logger); + logger.log("DONE"); +})(); diff --git a/packages/worker/src/app.service.ts b/packages/worker/src/app.service.ts index 66b57322ad..a731746fd4 100644 --- a/packages/worker/src/app.service.ts +++ b/packages/worker/src/app.service.ts @@ -3,7 +3,6 @@ import { ConfigService } from "@nestjs/config"; import { OnEvent } from "@nestjs/event-emitter"; import { DataSource } from "typeorm"; import { BLOCKS_REVERT_DETECTED_EVENT } from "./constants"; -import { ScriptMigrationRepository } from "./repositories/scriptMigration.repository"; import { BlocksRevertService } from "./blocksRevert"; import { BlockService } from "./block"; import { BatchService } from "./batch"; @@ -18,7 +17,6 @@ export class AppService implements OnModuleInit, OnModuleDestroy { private readonly logger: Logger; public constructor( - private readonly scriptMigrationRepository: ScriptMigrationRepository, private readonly counterService: CounterService, private readonly batchService: BatchService, private readonly blockService: BlockService, @@ -36,7 +34,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy { const enableScriptMigrations = this.configService.get("scriptMigrations.enabled"); if (enableScriptMigrations) { // Run script migrations on background if there are any to run. - runScriptMigrations(this.scriptMigrationRepository, this.dataSource, this.logger); + runScriptMigrations(this.dataSource, this.logger); } this.startWorkers(); }); diff --git a/packages/worker/src/repositories/scriptMigration.repository.ts b/packages/worker/src/repositories/scriptMigration.repository.ts index 7cd15f0ed6..35a08f8079 100644 --- a/packages/worker/src/repositories/scriptMigration.repository.ts +++ b/packages/worker/src/repositories/scriptMigration.repository.ts @@ -8,9 +8,4 @@ export class ScriptMigrationRepository extends BaseRepository { public constructor(unitOfWork: UnitOfWork) { super(ScriptMigration, unitOfWork); } - - public async update(criteria: any, partialEntity: Partial) { - const transactionManager = this.unitOfWork.getTransactionManager(); - await transactionManager.update(ScriptMigration, criteria, partialEntity); - } } diff --git a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts index 407f852cd0..a72902e218 100644 --- a/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts +++ b/packages/worker/src/scriptMigrations/1710057321666-AddAddressTransferType.ts @@ -2,7 +2,7 @@ import { DataSource } from "typeorm"; import { Logger } from "@nestjs/common"; import { setTimeout } from "timers/promises"; import { ScriptMigrationRunner } from "../utils/runScriptMigrations"; -import { ScriptMigrationRepository } from "../repositories/scriptMigration.repository"; +import { ScriptMigration } from "../entities"; const QUERY_MAX_RETRIES = 5; const QUERY_RETRY_MIN_INTERVAL_MS = 1000; @@ -10,8 +10,8 @@ const QUERY_RETRY_MIN_INTERVAL_MS = 1000; class AddAddressTransferType1710057321666 implements ScriptMigrationRunner { public readonly name = "AddAddressTransferType1710057321666"; - public async run(scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) { - const scriptMigration = await scriptMigrationRepository.findOneBy({ + public async run(dataSource: DataSource, logger: Logger) { + const scriptMigration = await dataSource.manager.findOneBy(ScriptMigration, { name: this.name, }); @@ -52,8 +52,11 @@ class AddAddressTransferType1710057321666 implements ScriptMigrationRunner { cursor + parallelWorkers * updateBatchSize }. Time: ${new Date().toJSON()}.` ); - await scriptMigrationRepository.update( - { number: scriptMigration.number }, + await dataSource.manager.update( + ScriptMigration, + { + number: scriptMigration.number, + }, { params: { fromTransferNumber: (cursor + parallelWorkers * updateBatchSize).toString(), diff --git a/packages/worker/src/utils/runScriptMigrations.ts b/packages/worker/src/utils/runScriptMigrations.ts index 3d85ed98d8..5e730082e5 100644 --- a/packages/worker/src/utils/runScriptMigrations.ts +++ b/packages/worker/src/utils/runScriptMigrations.ts @@ -2,30 +2,24 @@ import { DataSource, Not, Equal, And } from "typeorm"; import { Logger } from "@nestjs/common"; import { readdir } from "fs/promises"; import * as path from "path"; -import { ScriptMigrationRepository } from "../repositories/scriptMigration.repository"; -import { ScriptMigrationStatus } from "../entities/scriptMigration.entity"; +import { ScriptMigration, ScriptMigrationStatus } from "../entities/scriptMigration.entity"; -const GET_LATEST_MIGRATION_SQL = `SELECT * FROM migrations ORDER BY timestamp DESC limit 1`; +const GET_LATEST_MIGRATION_SQL = `SELECT timestamp FROM migrations ORDER BY timestamp DESC limit 1`; export interface ScriptMigrationRunner { name: string; - run: (scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) => Promise; + run: (dataSource: DataSource, logger: Logger) => Promise; } -export default async (scriptMigrationRepository: ScriptMigrationRepository, dataSource: DataSource, logger: Logger) => { +export default async (dataSource: DataSource, logger: Logger) => { const latestMigration = (await dataSource.query(GET_LATEST_MIGRATION_SQL))[0]; - await addScriptMigrationsToDB(scriptMigrationRepository, logger, Number(latestMigration?.timestamp || 0)); - await runScriptMigrations(scriptMigrationRepository, dataSource, logger, Number(latestMigration?.timestamp || 0)); + await addScriptMigrationsToDB(dataSource, logger, Number(latestMigration?.timestamp || 0)); + await runScriptMigrations(dataSource, logger, Number(latestMigration?.timestamp || 0)); }; -const runScriptMigrations = async ( - scriptMigrationRepository: ScriptMigrationRepository, - dataSource: DataSource, - logger: Logger, - latestDBMigrationTimestamp: number -) => { +const runScriptMigrations = async (dataSource: DataSource, logger: Logger, latestDBMigrationTimestamp: number) => { try { - const scriptMigrationsToRun = await scriptMigrationRepository.find({ + const scriptMigrationsToRun = await dataSource.manager.find(ScriptMigration, { where: { status: And(Not(Equal(ScriptMigrationStatus.Completed)), Not(Equal(ScriptMigrationStatus.Outdated))), }, @@ -35,13 +29,15 @@ const runScriptMigrations = async ( }); if (!scriptMigrationsToRun.length) { + logger.log(`No script migrations to run`); return; } for (const scriptMigration of scriptMigrationsToRun) { if (latestDBMigrationTimestamp > scriptMigration.timestamp) { // skip script migration if there are already newer regular DB migrations executed as it might be outdated and impossible to execute - await scriptMigrationRepository.update( + await dataSource.manager.update( + ScriptMigration, { number: scriptMigration.number }, { status: ScriptMigrationStatus.Outdated } ); @@ -52,26 +48,30 @@ const runScriptMigrations = async ( try { logger.log(`Starting script migration ${scriptMigration.name}`); if (scriptMigration.status !== ScriptMigrationStatus.Pending) { - await scriptMigrationRepository.update( + await dataSource.manager.update( + ScriptMigration, { number: scriptMigration.number }, { status: ScriptMigrationStatus.Pending } ); } - await migrationRunner.run(scriptMigrationRepository, dataSource, logger); + await migrationRunner.run(dataSource, logger); logger.log(`Script migration ${scriptMigration.name} completed`); - await scriptMigrationRepository.update( + await dataSource.manager.update( + ScriptMigration, { number: scriptMigration.number }, { status: ScriptMigrationStatus.Completed } ); } catch (error) { logger.error(`Script migration ${scriptMigration.name} failed`); - await scriptMigrationRepository.update( + await dataSource.manager.update( + ScriptMigration, { number: scriptMigration.number }, { status: ScriptMigrationStatus.Failed } ); throw error; } } + logger.log(`Completed script migrations`); } catch (error) { logger.error({ message: "Failed to execute script migrations", @@ -80,11 +80,7 @@ const runScriptMigrations = async ( } }; -const addScriptMigrationsToDB = async ( - scriptMigrationRepository: ScriptMigrationRepository, - logger: Logger, - latestDBMigrationTimestamp: number -) => { +const addScriptMigrationsToDB = async (dataSource: DataSource, logger: Logger, latestDBMigrationTimestamp: number) => { try { let scriptMigrationFileNames = await readdir(path.join(__dirname, "../scriptMigrations"), "utf-8"); scriptMigrationFileNames = scriptMigrationFileNames @@ -102,11 +98,11 @@ const addScriptMigrationsToDB = async ( // skip script migration if there are already newer regular DB migrations executed as it might be outdated and impossible to execute continue; } - const existingScriptMigration = await scriptMigrationRepository.findOneBy({ + const existingScriptMigration = await dataSource.manager.findOneBy(ScriptMigration, { name: scriptMigrationName, }); if (!existingScriptMigration) { - await scriptMigrationRepository.add({ + await dataSource.manager.insert(ScriptMigration, { name: scriptMigrationName, timestamp: scriptMigrationTimestamp, });