From 57460bf749a1e0ad5f0d61709770bbe84f8c7eb2 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 21 Jun 2024 11:58:15 +0200 Subject: [PATCH] fix: run migrations after tenant creation --- src/database/migrations/migrate.ts | 2 +- src/database/migrations/progressive.ts | 29 +++++++++++-- src/http/routes/admin/tenants.ts | 58 ++++++++++++++++++++------ 3 files changed, 71 insertions(+), 18 deletions(-) diff --git a/src/database/migrations/migrate.ts b/src/database/migrations/migrate.ts index 787df541..c00beed2 100644 --- a/src/database/migrations/migrate.ts +++ b/src/database/migrations/migrate.ts @@ -40,7 +40,7 @@ const backportMigrations = [ export const progressiveMigrations = new ProgressiveMigrations({ maxSize: 200, - interval: 1000 * 60, // 1m + interval: 1000 * 5, // 5s watch: pgQueueEnable, }) diff --git a/src/database/migrations/progressive.ts b/src/database/migrations/progressive.ts index 71517289..50c94f2f 100644 --- a/src/database/migrations/progressive.ts +++ b/src/database/migrations/progressive.ts @@ -19,10 +19,31 @@ export class ProgressiveMigrations { signal.addEventListener('abort', () => { if (this.watchInterval) { clearInterval(this.watchInterval) + this.drain().catch((e) => { + logSchema.error(logger, '[Migrations] Error creating migration jobs', { + type: 'migrations', + error: e, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + }) + }) } }) } + async drain() { + return this.createJobs(this.tenants.length).catch((e) => { + logSchema.error(logger, '[Migrations] Error creating migration jobs', { + type: 'migrations', + error: e, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + }) + }) + } + addTenant(tenant: string) { const tenantIndex = this.tenants.indexOf(tenant) @@ -36,7 +57,7 @@ export class ProgressiveMigrations { return } - this.createJobs().catch((e) => { + this.createJobs(this.options.maxSize).catch((e) => { logSchema.error(logger, '[Migrations] Error creating migration jobs', { type: 'migrations', error: e, @@ -56,7 +77,7 @@ export class ProgressiveMigrations { return } - this.createJobs().catch((e) => { + this.createJobs(this.options.maxSize).catch((e) => { logSchema.error(logger, '[Migrations] Error creating migration jobs', { type: 'migrations', error: e, @@ -68,9 +89,9 @@ export class ProgressiveMigrations { }, this.options.interval) } - protected async createJobs() { + protected async createJobs(maxJobs: number) { this.emittingJobs = true - const tenantsBatch = this.tenants.splice(0, this.options.maxSize) + const tenantsBatch = this.tenants.splice(0, maxJobs) const jobs = await Promise.allSettled( tenantsBatch.map(async (tenant) => { const tenantConfig = await getTenantConfig(tenant) diff --git a/src/http/routes/admin/tenants.ts b/src/http/routes/admin/tenants.ts index 6e5ec345..adcd677b 100644 --- a/src/http/routes/admin/tenants.ts +++ b/src/http/routes/admin/tenants.ts @@ -8,6 +8,7 @@ import { multitenantKnex, lastMigrationName, runMigrationsOnTenant, + progressiveMigrations, } from '../../../database' import { dbSuperUser, storage } from '../../plugins' @@ -179,7 +180,6 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body - await runMigrationsOnTenant(databaseUrl, tenantId) await multitenantKnex('tenants').insert({ id: tenantId, anon_key: encrypt(anonKey), @@ -191,10 +191,23 @@ export default async function routes(fastify: FastifyInstance) { jwks, service_key: encrypt(serviceKey), feature_image_transformation: features?.imageTransformation?.enabled ?? false, - migrations_version: await lastMigrationName(), - migrations_status: TenantMigrationStatus.COMPLETED, + migrations_version: null, + migrations_status: null, tracing_mode: tracingMode, }) + + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + await multitenantKnex('tenants') + .where('id', tenantId) + .update({ + migrations_version: await lastMigrationName(), + migrations_status: TenantMigrationStatus.COMPLETED, + }) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + reply.code(201).send() }) @@ -215,9 +228,7 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body const { tenantId } = request.params - if (databaseUrl) { - await runMigrationsOnTenant(databaseUrl, tenantId) - } + await multitenantKnex('tenants') .update({ anon_key: anonKey !== undefined ? encrypt(anonKey) : undefined, @@ -233,11 +244,24 @@ export default async function routes(fastify: FastifyInstance) { jwks, service_key: serviceKey !== undefined ? encrypt(serviceKey) : undefined, feature_image_transformation: features?.imageTransformation?.enabled, - migrations_version: databaseUrl ? await lastMigrationName() : undefined, - migrations_status: databaseUrl ? TenantMigrationStatus.COMPLETED : undefined, tracing_mode: tracingMode, }) .where('id', tenantId) + + if (databaseUrl) { + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + await multitenantKnex('tenants') + .where('id', tenantId) + .update({ + migrations_version: await lastMigrationName(), + migrations_status: TenantMigrationStatus.COMPLETED, + }) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + } + reply.code(204).send() } ) @@ -256,11 +280,8 @@ export default async function routes(fastify: FastifyInstance) { tracingMode, } = request.body const { tenantId } = request.params - await runMigrationsOnTenant(databaseUrl, tenantId) const tenantInfo: tenantDBInterface & { - migrations_version: string - migrations_status: TenantMigrationStatus tracing_mode?: string } = { id: tenantId, @@ -269,8 +290,6 @@ export default async function routes(fastify: FastifyInstance) { jwt_secret: encrypt(jwtSecret), jwks: jwks || null, service_key: encrypt(serviceKey), - migrations_version: await lastMigrationName(), - migrations_status: TenantMigrationStatus.COMPLETED, } if (fileSizeLimit) { @@ -294,6 +313,19 @@ export default async function routes(fastify: FastifyInstance) { } await multitenantKnex('tenants').insert(tenantInfo).onConflict('id').merge() + + try { + await runMigrationsOnTenant(databaseUrl, tenantId) + await multitenantKnex('tenants') + .where('id', tenantId) + .update({ + migrations_version: await lastMigrationName(), + migrations_status: TenantMigrationStatus.COMPLETED, + }) + } catch (e) { + progressiveMigrations.addTenant(tenantId) + } + reply.code(204).send() })