Skip to content

Commit

Permalink
fix: allow sync migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Aug 5, 2024
1 parent ef733f2 commit d95be79
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
1 change: 1 addition & 0 deletions migrations/tenant/0025-custom-metadata.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
---SYNC---
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL;
26 changes: 19 additions & 7 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getPostgresConnection,
progressiveMigrations,
runMigrationsOnTenant,
hasMissingSyncMigration,
} from '@internal/database'
import { verifyJWT } from '@internal/auth'
import { logSchema } from '@internal/monitoring'
Expand Down Expand Up @@ -201,19 +202,25 @@ export const migrations = fastifyPlugin(async function migrations(fastify) {
return
}

const needsToRunMigrationsNow = await hasMissingSyncMigration(request.tenantId)

// if the tenant is not marked as stale, add it to the progressive migrations queue
if (tenant.migrationStatus !== TenantMigrationStatus.FAILED_STALE) {
if (
!needsToRunMigrationsNow &&
tenant.migrationStatus !== TenantMigrationStatus.FAILED_STALE
) {
progressiveMigrations.addTenant(request.tenantId)
return
}

// if the tenant is marked as stale, try running the migrations
migrationsMutex(request.tenantId, async () => {
// if the tenant is marked as stale or there are pending SYNC migrations,
// try running the migrations
await migrationsMutex(request.tenantId, async () => {
if (tenant.syncMigrationsDone || migrationsUpToDate) {
return
}

await runMigrationsOnTenant(tenant.databaseUrl, request.tenantId, false)
await runMigrationsOnTenant(tenant.databaseUrl, request.tenantId, needsToRunMigrationsNow)
.then(async () => {
await updateTenantMigrationsState(request.tenantId)
tenant.syncMigrationsDone = true
Expand All @@ -231,9 +238,14 @@ export const migrations = fastifyPlugin(async function migrations(fastify) {
}
)
})
.catch(() => {
// no-op
})
}).catch((e) => {
logSchema.error(fastify.log, `[Migrations] Error running migrations ${request.tenantId} `, {
type: 'migrations',
error: e,
metadata: JSON.stringify({
strategy: 'progressive',
}),
})
})
})
}
Expand Down
38 changes: 36 additions & 2 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { BasicPgClient, Migration } from 'postgres-migrations/dist/types'
import { validateMigrationHashes } from 'postgres-migrations/dist/validation'
import { runMigration } from 'postgres-migrations/dist/run-migration'
import { searchPath } from '../connection'
import { listTenantsToMigrate } from '../tenant'
import { getTenantConfig, listTenantsToMigrate, TenantMigrationStatus } from '../tenant'
import { multitenantKnex } from '../multitenant-db'
import { ProgressiveMigrations } from './progressive'
import { RunMigrationsOnTenants } from '@storage/events'
Expand Down Expand Up @@ -80,6 +80,28 @@ export async function lastMigrationName() {
return migrations[migrations.length - 1].name
}

export async function hasMissingSyncMigration(tenantId: string) {
const { migrationVersion, migrationStatus } = await getTenantConfig(tenantId)
const migrations = await loadMigrationFilesCached('./migrations/tenant')

if (!migrationStatus) {
return migrations.some((m) => {
return m.contents.includes('---SYNC---')
})
}

const indexLastMigration = migrations.findIndex((m) => m.name === migrationVersion)

if (indexLastMigration === -1) {
return true
}

const migrationAfterLast = migrations.slice(indexLastMigration + 1)
return migrationAfterLast.some((m) => {
return m.contents.includes('---SYNC---')
})
}

/**
* Runs migrations for all tenants
* only one instance at the time is allowed to run
Expand Down Expand Up @@ -369,19 +391,31 @@ function withAdvisoryLock<T>(
try {
try {
let acquired = false
let tries = 1

const timeout = 3000
const start = Date.now()

while (!acquired) {
const elapsed = Date.now() - start
if (elapsed > timeout) {
throw ERRORS.LockTimeout()
}

const lockResult = await client.query(
'SELECT pg_try_advisory_lock(-8525285245963000605);'
)
if (lockResult.rows[0].pg_try_advisory_lock === true) {
acquired = true
} else {
if (waitForLock) {
await new Promise((res) => setTimeout(res, 700))
await new Promise((res) => setTimeout(res, 20 * tries))
} else {
return [] as unknown as Promise<T>
}
}

tries++
}
} catch (e) {
throw e
Expand Down

0 comments on commit d95be79

Please sign in to comment.