Skip to content

Commit

Permalink
fix: run migrations after tenant creation
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jun 21, 2024
1 parent 8483e05 commit 3c50325
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const backportMigrations = [

export const progressiveMigrations = new ProgressiveMigrations({
maxSize: 200,
interval: 1000 * 60, // 1m
interval: 1000 * 5, // 5s
watch: pgQueueEnable,
})

Expand Down
29 changes: 25 additions & 4 deletions src/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,31 @@ export class ProgressiveMigrations {
signal.addEventListener('abort', () => {
if (this.watchInterval) {
clearInterval(this.watchInterval)
this.drain().catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
error: e,
metadata: JSON.stringify({
strategy: 'progressive',
}),
})
})
}
})
}

async drain() {
return this.createJobs(this.tenants.length).catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
error: e,
metadata: JSON.stringify({
strategy: 'progressive',
}),
})
})
}

addTenant(tenant: string) {
const tenantIndex = this.tenants.indexOf(tenant)

Expand All @@ -36,7 +57,7 @@ export class ProgressiveMigrations {
return
}

this.createJobs().catch((e) => {
this.createJobs(this.options.maxSize).catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
error: e,
Expand All @@ -56,7 +77,7 @@ export class ProgressiveMigrations {
return
}

this.createJobs().catch((e) => {
this.createJobs(this.options.maxSize).catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
error: e,
Expand All @@ -68,9 +89,9 @@ export class ProgressiveMigrations {
}, this.options.interval)
}

protected async createJobs() {
protected async createJobs(maxJobs: number) {
this.emittingJobs = true
const tenantsBatch = this.tenants.splice(0, this.options.maxSize)
const tenantsBatch = this.tenants.splice(0, maxJobs)
const jobs = await Promise.allSettled(
tenantsBatch.map(async (tenant) => {
const tenantConfig = await getTenantConfig(tenant)
Expand Down
58 changes: 45 additions & 13 deletions src/http/routes/admin/tenants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
multitenantKnex,
lastMigrationName,
runMigrationsOnTenant,
progressiveMigrations,
} from '../../../database'
import { dbSuperUser, storage } from '../../plugins'

Expand Down Expand Up @@ -179,7 +180,6 @@ export default async function routes(fastify: FastifyInstance) {
tracingMode,
} = request.body

await runMigrationsOnTenant(databaseUrl, tenantId)
await multitenantKnex('tenants').insert({
id: tenantId,
anon_key: encrypt(anonKey),
Expand All @@ -191,10 +191,23 @@ export default async function routes(fastify: FastifyInstance) {
jwks,
service_key: encrypt(serviceKey),
feature_image_transformation: features?.imageTransformation?.enabled ?? false,
migrations_version: await lastMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
migrations_version: null,
migrations_status: null,
tracing_mode: tracingMode,
})

try {
await runMigrationsOnTenant(databaseUrl, tenantId)
await multitenantKnex('tenants')
.where('id', tenantId)
.update({
migrations_version: await lastMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
})
} catch (e) {
progressiveMigrations.addTenant(tenantId)
}

reply.code(201).send()
})

Expand All @@ -215,9 +228,7 @@ export default async function routes(fastify: FastifyInstance) {
tracingMode,
} = request.body
const { tenantId } = request.params
if (databaseUrl) {
await runMigrationsOnTenant(databaseUrl, tenantId)
}

await multitenantKnex('tenants')
.update({
anon_key: anonKey !== undefined ? encrypt(anonKey) : undefined,
Expand All @@ -233,11 +244,24 @@ export default async function routes(fastify: FastifyInstance) {
jwks,
service_key: serviceKey !== undefined ? encrypt(serviceKey) : undefined,
feature_image_transformation: features?.imageTransformation?.enabled,
migrations_version: databaseUrl ? await lastMigrationName() : undefined,
migrations_status: databaseUrl ? TenantMigrationStatus.COMPLETED : undefined,
tracing_mode: tracingMode,
})
.where('id', tenantId)

if (databaseUrl) {
try {
await runMigrationsOnTenant(databaseUrl, tenantId)
await multitenantKnex('tenants')
.where('id', tenantId)
.update({
migrations_version: await lastMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
})
} catch (e) {
progressiveMigrations.addTenant(tenantId)
}
}

reply.code(204).send()
}
)
Expand All @@ -256,11 +280,8 @@ export default async function routes(fastify: FastifyInstance) {
tracingMode,
} = request.body
const { tenantId } = request.params
await runMigrationsOnTenant(databaseUrl, tenantId)

const tenantInfo: tenantDBInterface & {
migrations_version: string
migrations_status: TenantMigrationStatus
tracing_mode?: string
} = {
id: tenantId,
Expand All @@ -269,8 +290,6 @@ export default async function routes(fastify: FastifyInstance) {
jwt_secret: encrypt(jwtSecret),
jwks: jwks || null,
service_key: encrypt(serviceKey),
migrations_version: await lastMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
}

if (fileSizeLimit) {
Expand All @@ -294,6 +313,19 @@ export default async function routes(fastify: FastifyInstance) {
}

await multitenantKnex('tenants').insert(tenantInfo).onConflict('id').merge()

try {
await runMigrationsOnTenant(databaseUrl, tenantId)
await multitenantKnex('tenants')
.where('id', tenantId)
.update({
migrations_version: await lastMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
})
} catch (e) {
progressiveMigrations.addTenant(tenantId)
}

reply.code(204).send()
})

Expand Down

0 comments on commit 3c50325

Please sign in to comment.