diff --git a/src/database/migrations/migrate.ts b/src/database/migrations/migrate.ts index 787df541..c00beed2 100644 --- a/src/database/migrations/migrate.ts +++ b/src/database/migrations/migrate.ts @@ -40,7 +40,7 @@ const backportMigrations = [ export const progressiveMigrations = new ProgressiveMigrations({ maxSize: 200, - interval: 1000 * 60, // 1m + interval: 1000 * 5, // 5s watch: pgQueueEnable, }) diff --git a/src/database/migrations/progressive.ts b/src/database/migrations/progressive.ts index 71517289..50c94f2f 100644 --- a/src/database/migrations/progressive.ts +++ b/src/database/migrations/progressive.ts @@ -19,10 +19,31 @@ export class ProgressiveMigrations { signal.addEventListener('abort', () => { if (this.watchInterval) { clearInterval(this.watchInterval) + this.drain().catch((e) => { + logSchema.error(logger, '[Migrations] Error creating migration jobs', { + type: 'migrations', + error: e, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + }) + }) } }) } + async drain() { + return this.createJobs(this.tenants.length).catch((e) => { + logSchema.error(logger, '[Migrations] Error creating migration jobs', { + type: 'migrations', + error: e, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + }) + }) + } + addTenant(tenant: string) { const tenantIndex = this.tenants.indexOf(tenant) @@ -36,7 +57,7 @@ export class ProgressiveMigrations { return } - this.createJobs().catch((e) => { + this.createJobs(this.options.maxSize).catch((e) => { logSchema.error(logger, '[Migrations] Error creating migration jobs', { type: 'migrations', error: e, @@ -56,7 +77,7 @@ export class ProgressiveMigrations { return } - this.createJobs().catch((e) => { + this.createJobs(this.options.maxSize).catch((e) => { logSchema.error(logger, '[Migrations] Error creating migration jobs', { type: 'migrations', error: e, @@ -68,9 +89,9 @@ export class ProgressiveMigrations { }, this.options.interval) } - protected async createJobs() { + protected async createJobs(maxJobs: number) { this.emittingJobs = true - const tenantsBatch = this.tenants.splice(0, this.options.maxSize) + const tenantsBatch = this.tenants.splice(0, maxJobs) const jobs = await Promise.allSettled( tenantsBatch.map(async (tenant) => { const tenantConfig = await getTenantConfig(tenant) diff --git a/src/http/routes/admin/tenants.ts b/src/http/routes/admin/tenants.ts index 6e5ec345..c797aeba 100644 --- a/src/http/routes/admin/tenants.ts +++ b/src/http/routes/admin/tenants.ts @@ -8,6 +8,7 @@ import { multitenantKnex, lastMigrationName, runMigrationsOnTenant, + progressiveMigrations, } from '../../../database' import { dbSuperUser, storage } from '../../plugins' @@ -179,7 +180,6 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body - await runMigrationsOnTenant(databaseUrl, tenantId) await multitenantKnex('tenants').insert({ id: tenantId, anon_key: encrypt(anonKey), @@ -195,6 +195,13 @@ export default async function routes(fastify: FastifyInstance) { migrations_status: TenantMigrationStatus.COMPLETED, tracing_mode: tracingMode, }) + + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + reply.code(201).send() }) @@ -215,9 +222,7 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body const { tenantId } = request.params - if (databaseUrl) { - await runMigrationsOnTenant(databaseUrl, tenantId) - } + await multitenantKnex('tenants') .update({ anon_key: anonKey !== undefined ? encrypt(anonKey) : undefined, @@ -238,6 +243,15 @@ export default async function routes(fastify: FastifyInstance) { tracing_mode: tracingMode, }) .where('id', tenantId) + + if (databaseUrl) { + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + } + reply.code(204).send() } ) @@ -256,11 +270,10 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body const { tenantId } = request.params - await runMigrationsOnTenant(databaseUrl, tenantId) const tenantInfo: tenantDBInterface & { migrations_version: string - migrations_status: TenantMigrationStatus + migrations_status: TenantMigrationStatus | null tracing_mode?: string } = { id: tenantId, @@ -270,7 +283,7 @@ export default async function routes(fastify: FastifyInstance) { jwks: jwks || null, service_key: encrypt(serviceKey), migrations_version: await lastMigrationName(), - migrations_status: TenantMigrationStatus.COMPLETED, + migrations_status: null, } if (fileSizeLimit) { @@ -294,6 +307,13 @@ export default async function routes(fastify: FastifyInstance) { } await multitenantKnex('tenants').insert(tenantInfo).onConflict('id').merge() + + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + reply.code(204).send() })