diff --git a/migrations/tenant/0025-custom-metadata.sql b/migrations/tenant/0025-custom-metadata.sql index b18d92f6..97fe6507 100644 --- a/migrations/tenant/0025-custom-metadata.sql +++ b/migrations/tenant/0025-custom-metadata.sql @@ -1,2 +1,3 @@ +---SYNC--- ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL; ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL; \ No newline at end of file diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 35697ae1..963c9ae9 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -10,6 +10,7 @@ import { getPostgresConnection, progressiveMigrations, runMigrationsOnTenant, + hasMissingSyncMigration, } from '@internal/database' import { verifyJWT } from '@internal/auth' import { logSchema } from '@internal/monitoring' @@ -201,19 +202,25 @@ export const migrations = fastifyPlugin(async function migrations(fastify) { return } + const needsToRunMigrationsNow = await hasMissingSyncMigration(request.tenantId) + // if the tenant is not marked as stale, add it to the progressive migrations queue - if (tenant.migrationStatus !== TenantMigrationStatus.FAILED_STALE) { + if ( + !needsToRunMigrationsNow && + tenant.migrationStatus !== TenantMigrationStatus.FAILED_STALE + ) { progressiveMigrations.addTenant(request.tenantId) return } - // if the tenant is marked as stale, try running the migrations - migrationsMutex(request.tenantId, async () => { + // if the tenant is marked as stale or there are pending SYNC migrations, + // try running the migrations + await migrationsMutex(request.tenantId, async () => { if (tenant.syncMigrationsDone || migrationsUpToDate) { return } - await runMigrationsOnTenant(tenant.databaseUrl, request.tenantId, false) + await runMigrationsOnTenant(tenant.databaseUrl, request.tenantId, needsToRunMigrationsNow) .then(async () => { await updateTenantMigrationsState(request.tenantId) tenant.syncMigrationsDone = true @@ -231,9 +238,14 @@ export const migrations = fastifyPlugin(async function migrations(fastify) { } ) }) - .catch(() => { - // no-op - }) + }).catch((e) => { + logSchema.error(fastify.log, `[Migrations] Error running migrations ${request.tenantId} `, { + type: 'migrations', + error: e, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + }) }) }) } diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index b78746a7..c84901d7 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -7,7 +7,7 @@ import { BasicPgClient, Migration } from 'postgres-migrations/dist/types' import { validateMigrationHashes } from 'postgres-migrations/dist/validation' import { runMigration } from 'postgres-migrations/dist/run-migration' import { searchPath } from '../connection' -import { listTenantsToMigrate } from '../tenant' +import { getTenantConfig, listTenantsToMigrate, TenantMigrationStatus } from '../tenant' import { multitenantKnex } from '../multitenant-db' import { ProgressiveMigrations } from './progressive' import { RunMigrationsOnTenants } from '@storage/events' @@ -80,6 +80,28 @@ export async function lastMigrationName() { return migrations[migrations.length - 1].name } +export async function hasMissingSyncMigration(tenantId: string) { + const { migrationVersion, migrationStatus } = await getTenantConfig(tenantId) + const migrations = await loadMigrationFilesCached('./migrations/tenant') + + if (!migrationStatus) { + return migrations.some((m) => { + return m.contents.includes('---SYNC---') + }) + } + + const indexLastMigration = migrations.findIndex((m) => m.name === migrationVersion) + + if (indexLastMigration === -1) { + return true + } + + const migrationAfterLast = migrations.slice(indexLastMigration + 1) + return migrationAfterLast.some((m) => { + return m.contents.includes('---SYNC---') + }) +} + /** * Runs migrations for all tenants * only one instance at the time is allowed to run @@ -369,7 +391,17 @@ function withAdvisoryLock( try { try { let acquired = false + let tries = 1 + + const timeout = 3000 + const start = Date.now() + while (!acquired) { + const elapsed = Date.now() - start + if (elapsed > timeout) { + throw ERRORS.LockTimeout() + } + const lockResult = await client.query( 'SELECT pg_try_advisory_lock(-8525285245963000605);' ) @@ -377,11 +409,13 @@ function withAdvisoryLock( acquired = true } else { if (waitForLock) { - await new Promise((res) => setTimeout(res, 700)) + await new Promise((res) => setTimeout(res, 20 * tries)) } else { return [] as unknown as Promise } } + + tries++ } } catch (e) { throw e