Skip to content

Commit

Permalink
fix: idempotent migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 8, 2024
1 parent 531e8af commit c9787f2
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 14 deletions.
4 changes: 4 additions & 0 deletions migrations/tenant/0002-storage-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ CREATE INDEX IF NOT EXISTS name_prefix_search ON storage.objects(name text_patte

ALTER TABLE storage.objects ENABLE ROW LEVEL SECURITY;

drop function if exists storage.foldername;
CREATE OR REPLACE FUNCTION storage.foldername(name text)
RETURNS text[]
LANGUAGE plpgsql
Expand All @@ -86,6 +87,7 @@ BEGIN
END
$function$;

drop function if exists storage.filename;
CREATE OR REPLACE FUNCTION storage.filename(name text)
RETURNS text
LANGUAGE plpgsql
Expand All @@ -98,6 +100,7 @@ BEGIN
END
$function$;

drop function if exists storage.extension;
CREATE OR REPLACE FUNCTION storage.extension(name text)
RETURNS text
LANGUAGE plpgsql
Expand All @@ -114,6 +117,7 @@ END
$function$;

-- @todo can this query be optimised further?
drop function if exists storage.search;
CREATE OR REPLACE FUNCTION storage.search(prefix text, bucketname text, limits int DEFAULT 100, levels int DEFAULT 1, offsets int DEFAULT 0)
RETURNS TABLE (
name text,
Expand Down
2 changes: 1 addition & 1 deletion migrations/tenant/0003-pathtoken-column.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
alter table storage.objects add column path_tokens text[] generated always as (string_to_array("name", '/')) stored;
alter table storage.objects add column if not exists path_tokens text[] generated always as (string_to_array("name", '/')) stored;

CREATE OR REPLACE FUNCTION storage.search(prefix text, bucketname text, limits int DEFAULT 100, levels int DEFAULT 1, offsets int DEFAULT 0)
RETURNS TABLE (
Expand Down
1 change: 1 addition & 0 deletions migrations/tenant/0005-add-size-functions.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
drop function if exists storage.get_size_by_bucket();
CREATE OR REPLACE FUNCTION storage.get_size_by_bucket()
RETURNS TABLE (
size BIGINT,
Expand Down
2 changes: 1 addition & 1 deletion migrations/tenant/0008-add-public-to-buckets.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ALTER TABLE storage.buckets ADD COLUMN "public" boolean default false;
ALTER TABLE storage.buckets ADD COLUMN if not exists "public" boolean default false;
1 change: 1 addition & 0 deletions migrations/tenant/0009-fix-search-function.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
drop function if exists storage.search;
CREATE OR REPLACE FUNCTION storage.search(prefix text, bucketname text, limits int DEFAULT 100, levels int DEFAULT 1, offsets int DEFAULT 0)
RETURNS TABLE (
name text,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
CREATE OR REPLACE FUNCTION update_updated_at_column()
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ language plpgsql;

DROP trigger if exists update_objects_updated_at on storage.objects;
CREATE TRIGGER update_objects_updated_at BEFORE UPDATE ON storage.objects FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
Original file line number Diff line number Diff line change
@@ -1 +1 @@
alter table storage.buckets add column avif_autodetection bool default false;
alter table storage.buckets add column if not exists avif_autodetection bool default false;
4 changes: 2 additions & 2 deletions migrations/tenant/0013-add-bucket-custom-limits.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
alter table storage.buckets add column max_file_size_kb int default null;
alter table storage.buckets add column allowed_mime_types text[] default null;
alter table storage.buckets add column if not exists max_file_size_kb int default null;
alter table storage.buckets add column if not exists allowed_mime_types text[] default null;
14 changes: 12 additions & 2 deletions migrations/tenant/0014-use-bytes-for-max-size.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
ALTER TABLE storage.buckets RENAME COLUMN max_file_size_kb TO file_size_limit;
ALTER TABLE storage.buckets ALTER COLUMN file_size_limit TYPE bigint;

DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'buckets' AND column_name = 'max_file_size_kb') THEN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'buckets' AND column_name = 'file_size_limit') THEN
ALTER TABLE storage.buckets RENAME COLUMN max_file_size_kb TO file_size_limit;
ALTER TABLE storage.buckets ALTER COLUMN file_size_limit TYPE bigint;
ELSE
ALTER TABLE storage.buckets DROP COLUMN max_file_size_kb;
END IF;
END IF;
END$$;
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type StorageConfigType = {
dbAuthenticatedRole: string
dbServiceRole: string
dbInstallRoles: boolean
dbRefreshMigrationHashesOnMismatch: boolean
dbSuperUser: string
dbSearchPath: string
databaseURL: string
Expand Down Expand Up @@ -134,6 +135,9 @@ export function getConfig(): StorageConfigType {
dbServiceRole: getOptionalConfigFromEnv('DB_SERVICE_ROLE') || 'service_role',
dbAuthenticatedRole: getOptionalConfigFromEnv('DB_AUTHENTICATED_ROLE') || 'authenticated',
dbInstallRoles: !(getOptionalConfigFromEnv('DB_INSTALL_ROLES') === 'false'),
dbRefreshMigrationHashesOnMismatch: !(
getOptionalConfigFromEnv('DB_ALLOW_MIGRATION_REFRESH') === 'false'
),
dbSuperUser: getOptionalConfigFromEnv('DB_SUPER_USER') || 'postgres',
dbSearchPath: getOptionalConfigFromEnv('DB_SEARCH_PATH') || '',
multitenantDatabaseUrl: getOptionalConfigFromEnv('MULTITENANT_DATABASE_URL'),
Expand Down
63 changes: 57 additions & 6 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
dbSuperUser,
dbServiceRole,
dbInstallRoles,
dbRefreshMigrationHashesOnMismatch,
} = getConfig()

const loadMigrationFilesCached = memoizePromise(loadMigrationFiles)
Expand Down Expand Up @@ -117,7 +118,7 @@ function runMigrations(migrationsDirectory: string, shouldCreateStorageSchema =
appliedMigrations = rows

if (rows.length > 0) {
appliedMigrations = await refreshMigrations(
appliedMigrations = await refreshMigrationPosition(
client,
migrationTableName,
appliedMigrations,
Expand All @@ -131,7 +132,20 @@ function runMigrations(migrationsDirectory: string, shouldCreateStorageSchema =
}
}

validateMigrationHashes(intendedMigrations, appliedMigrations)
try {
validateMigrationHashes(intendedMigrations, appliedMigrations)
} catch (e) {
if (!dbRefreshMigrationHashesOnMismatch) {
throw e
}

await refreshMigrationHash(
client,
migrationTableName,
intendedMigrations,
appliedMigrations
)
}

const migrationsToRun = filterMigrations(intendedMigrations, appliedMigrations)
const completedMigrations = []
Expand Down Expand Up @@ -197,9 +211,10 @@ async function doesTableExist(client: BasicPgClient, tableName: string) {
*/
async function doesSchemaExists(client: BasicPgClient, schemaName: string) {
const result = await client.query(SQL`SELECT EXISTS (
SELECT 1
FROM information_schema.schemata WHERE schema_name = '${schemaName}'
);`)
SELECT 1
FROM information_schema.schemata
WHERE schema_name = ${schemaName}
);`)

return result.rows.length > 0 && result.rows[0].exists === 'true'
}
Expand Down Expand Up @@ -241,6 +256,41 @@ function withAdvisoryLock<T>(
}
}

async function refreshMigrationHash(
client: BasicPgClient,
migrationTableName: string,
intendedMigrations: Migration[],
appliedMigrations: Migration[]
) {
const invalidHash = (migration: Migration) => {
const appliedMigration = appliedMigrations[migration.id]
return appliedMigration != null && appliedMigration.hash !== migration.hash
}

// Assert migration hashes are still same
const invalidHashes = intendedMigrations.filter(invalidHash)

if (invalidHashes.length > 0) {
await client.query('BEGIN')

try {
await Promise.all(
invalidHashes.map((migration) => {
const query = SQL`UPDATE `
.append(migrationTableName)
.append(SQL` SET hash = ${migration.hash} WHERE id = ${migration.id}`)

return client.query(query)
})
)
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
}
}
}

/**
* Backports migrations that were added after the initial release
*
Expand All @@ -249,7 +299,7 @@ function withAdvisoryLock<T>(
* @param appliedMigrations
* @param intendedMigrations
*/
async function refreshMigrations(
async function refreshMigrationPosition(
client: BasicPgClient,
migrationTableName: string,
appliedMigrations: Migration[],
Expand All @@ -264,6 +314,7 @@ async function refreshMigrations(
if (!existingMigration || (existingMigration && existingMigration.name !== migration.from)) {
return
}

// slice till the migration we want to backport
const migrations = newMigrations.slice(0, migration.index)

Expand Down

0 comments on commit c9787f2

Please sign in to comment.