From 592de67e3c59aeb355e76a4011cf9f8bad10cc04 Mon Sep 17 00:00:00 2001 From: Johan Nyman Date: Fri, 9 Feb 2024 08:49:09 +0100 Subject: [PATCH 1/5] fix:lock the remove-file-packages, to avoid creating corrupt files due to multiple processes writing to it --- shared/packages/worker/package.json | 2 + .../accessorHandlers/lib/FileHandler.ts | 167 ++++++++++++------ yarn.lock | 21 +++ 3 files changed, 139 insertions(+), 51 deletions(-) diff --git a/shared/packages/worker/package.json b/shared/packages/worker/package.json index 6d030a8d..efbabd5c 100644 --- a/shared/packages/worker/package.json +++ b/shared/packages/worker/package.json @@ -15,6 +15,7 @@ "devDependencies": { "@types/deep-diff": "^1.0.0", "@types/node-fetch": "^2.5.8", + "@types/proper-lockfile": "^4.1.4", "@types/tmp": "~0.2.2" }, "dependencies": { @@ -26,6 +27,7 @@ "form-data": "^4.0.0", "mkdirp": "^2.1.3", "node-fetch": "^2.6.1", + "proper-lockfile": "^4.1.2", "tmp": "~0.2.1", "tv-automation-quantel-gateway-client": "3.1.7", "windows-network-drive": "^4.0.1", diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts index 2c699bc7..1bbbc3f9 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts @@ -1,6 +1,8 @@ import path from 'path' import { promisify } from 'util' import fs from 'fs' +import * as LockFile from 'proper-lockfile' +import _ from 'underscore' import { ExpectedPackage, StatusCode, @@ -53,48 +55,36 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso /** Schedule the package for later removal */ async delayPackageRemoval(filePath: string, ttl: number): Promise { - const packagesToRemove = await this.getPackagesToRemove() - - // Search for a pre-existing entry: - let found = false - for (const entry of packagesToRemove) { - if (entry.filePath === filePath) { - // extend the TTL if it was found: - entry.removeTime = Date.now() + ttl - - found = true - break + await this.updatePackagesToRemove((packagesToRemove) => { + // Search for a pre-existing entry: + let alreadyExists = false + for (const entry of packagesToRemove) { + if (entry.filePath === filePath) { + // extend the TTL if it was found: + entry.removeTime = Date.now() + ttl + + alreadyExists = true + break + } } - } - if (!found) { - packagesToRemove.push({ - filePath: filePath, - removeTime: Date.now() + ttl, - }) - } - - await this.storePackagesToRemove(packagesToRemove) + if (!alreadyExists) { + packagesToRemove.push({ + filePath: filePath, + removeTime: Date.now() + ttl, + }) + } + return packagesToRemove + }) } /** Clear a scheduled later removal of a package */ async clearPackageRemoval(filePath: string): Promise { - const packagesToRemove = await this.getPackagesToRemove() - - let found = false - for (let i = 0; i < packagesToRemove.length; i++) { - const entry = packagesToRemove[i] - if (entry.filePath === filePath) { - packagesToRemove.splice(i, 1) - found = true - break - } - } - if (found) { - await this.storePackagesToRemove(packagesToRemove) - } + await this.updatePackagesToRemove((packagesToRemove) => { + return packagesToRemove.filter((entry) => entry.filePath !== filePath) + }) } /** Remove any packages that are due for removal */ async removeDuePackages(): Promise { - let packagesToRemove = await this.getPackagesToRemove() + const packagesToRemove = await this.getPackagesToRemove() const removedFilePaths: string[] = [] for (const entry of packagesToRemove) { @@ -112,21 +102,14 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso } } - // Fetch again, to decrease the risk of race-conditions: - packagesToRemove = await this.getPackagesToRemove() - let changed = false - // Remove paths from array: - for (let i = 0; i < packagesToRemove.length; i++) { - const entry = packagesToRemove[i] - if (removedFilePaths.includes(entry.filePath)) { - packagesToRemove.splice(i, 1) - changed = true - break - } - } - if (changed) { - await this.storePackagesToRemove(packagesToRemove) + if (removedFilePaths.length > 0) { + // Update the list of packages to remove: + await this.updatePackagesToRemove((packagesToRemove) => { + // Remove the entries of the files we removed: + return packagesToRemove.filter((entry) => !removedFilePaths.includes(entry.filePath)) + }) } + return null } /** Unlink (remove) a file, if it exists. Returns true if it did exist */ @@ -430,8 +413,84 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso } return packagesToRemove } - private async storePackagesToRemove(packagesToRemove: DelayPackageRemovalEntry[]): Promise { - await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(packagesToRemove)) + /** Update the deferred-remove-packages list */ + private async updatePackagesToRemove( + cbManipulateList: (list: DelayPackageRemovalEntry[]) => DelayPackageRemovalEntry[] + ): Promise { + // Note: It is high likelihood that several processes will try to write to this file at the same time + // Therefore, we need to lock the file while writing to it. + + const LOCK_ATTEMPTS_COUNT = 10 + const RETRY_TIMEOUT = 100 // ms + + let lockCompromisedError: Error | null = null + + // Retry up to 10 times at locking and writing the file: + for (let i = 0; i < LOCK_ATTEMPTS_COUNT; i++) { + lockCompromisedError = null + + // Get file lock + let releaseLock: (() => Promise) | undefined = undefined + try { + releaseLock = await LockFile.lock(this.deferRemovePackagesPath, { + onCompromised: (err) => { + // This is called if the lock somehow gets compromised + this.worker.logger.warn(`updatePackagesToRemove: Lock compromised: ${err}`) + lockCompromisedError = err + }, + }) + } catch (e) { + if (e instanceof Error && (e as any).code === 'ENOENT') { + // The file does not exist. Create an empty file and try again: + await fsWriteFile(this.deferRemovePackagesPath, '') + continue + } else if (e instanceof Error && (e as any).code === 'ELOCKED') { + // Already locked, try again later: + await sleep(RETRY_TIMEOUT) + continue + } else { + // Unknown error. Log and exit: + this.worker.logger.error(e) + return + } + } + // At this point, we have acquired the lock. + try { + // Read and write to the file: + const oldList = await this.getPackagesToRemove() + const newList = cbManipulateList(clone(oldList)) + if (!_.isEqual(oldList, newList)) { + if (lockCompromisedError) { + // The lock was compromised. Try again: + continue + } + await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(newList)) + } + + // Release the lock: + if (!lockCompromisedError) await releaseLock() + // Done, exit the function: + return + } catch (e) { + if (e instanceof Error && (e as any).code === 'ERELEASED') { + // Lock was already released. Something must have gone wrong (eg. someone deleted a folder), + // Log and try again: + this.worker.logger.warn(`updatePackagesToRemove: Lock was already released`) + continue + } else { + // Release the lock: + if (!lockCompromisedError) await releaseLock() + throw e + } + } + } + // At this point, the lock failed + this.worker.logger.error( + `updatePackagesToRemove: Failed to lock file "${this.deferRemovePackagesPath}" after ${LOCK_ATTEMPTS_COUNT} attempts` + ) + if (lockCompromisedError) { + this.worker.logger.error(`updatePackagesToRemove: lockCompromisedError: ${lockCompromisedError}`) + } } } @@ -448,3 +507,9 @@ enum StatusCategory { WATCHER = 'watcher', FILE = 'file_', } +function clone(o: T): T { + return JSON.parse(JSON.stringify(o)) +} +async function sleep(duration: number): Promise { + return new Promise((r) => setTimeout(r, duration)) +} diff --git a/yarn.lock b/yarn.lock index 081797a7..cb44fdd8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1739,6 +1739,13 @@ resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.0.tgz#2f8bb441434d163b35fb8ffdccd7138927ffb8c0" integrity sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA== +"@types/proper-lockfile@^4.1.4": + version "4.1.4" + resolved "https://registry.yarnpkg.com/@types/proper-lockfile/-/proper-lockfile-4.1.4.tgz#cd9fab92bdb04730c1ada542c356f03620f84008" + integrity sha512-uo2ABllncSqg9F1D4nugVl9v93RmjxF6LJzQLMLDdPaXCUIDPeOJ21Gbqi43xNKzBi/WQ0Q0dICqufzQbMjipQ== + dependencies: + "@types/retry" "*" + "@types/qs@*": version "6.9.7" resolved "https://registry.yarnpkg.com/@types/qs/-/qs-6.9.7.tgz#63bb7d067db107cc1e457c303bc25d511febf6cb" @@ -1756,6 +1763,11 @@ dependencies: "@types/node" "*" +"@types/retry@*": + version "0.12.5" + resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.5.tgz#f090ff4bd8d2e5b940ff270ab39fd5ca1834a07e" + integrity sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw== + "@types/semver@^7.3.12": version "7.3.13" resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.3.13.tgz#da4bfd73f49bd541d28920ab0e2bf0ee80f71c91" @@ -8286,6 +8298,15 @@ promzard@^0.3.0: dependencies: read "1" +proper-lockfile@^4.1.2: + version "4.1.2" + resolved "https://registry.yarnpkg.com/proper-lockfile/-/proper-lockfile-4.1.2.tgz#c8b9de2af6b2f1601067f98e01ac66baa223141f" + integrity sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA== + dependencies: + graceful-fs "^4.2.4" + retry "^0.12.0" + signal-exit "^3.0.2" + proto-list@~1.2.1: version "1.2.4" resolved "https://registry.yarnpkg.com/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849" From 87b90d7e2203ac30001bf41555ce5c7876ff500c Mon Sep 17 00:00:00 2001 From: Johan Nyman Date: Tue, 13 Feb 2024 11:54:26 +0100 Subject: [PATCH 2/5] chore: enable unit tests in worker package (removing old failing tests) --- shared/packages/worker/package.json | 2 +- .../worker/src/__tests__/fileShare.spec.ts | 67 ------------------- 2 files changed, 1 insertion(+), 68 deletions(-) delete mode 100644 shared/packages/worker/src/__tests__/fileShare.spec.ts diff --git a/shared/packages/worker/package.json b/shared/packages/worker/package.json index efbabd5c..4f177ea1 100644 --- a/shared/packages/worker/package.json +++ b/shared/packages/worker/package.json @@ -7,7 +7,7 @@ "scripts": { "build": "yarn rimraf dist && yarn build:main", "build:main": "tsc -p tsconfig.json", - "__test": "jest" + "test": "jest --runInBand" }, "engines": { "node": ">=14.18.0" diff --git a/shared/packages/worker/src/__tests__/fileShare.spec.ts b/shared/packages/worker/src/__tests__/fileShare.spec.ts deleted file mode 100644 index a76212af..00000000 --- a/shared/packages/worker/src/__tests__/fileShare.spec.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { parseNetUse } from '../worker/accessorHandlers/fileShare' - -describe('fileShare', () => { - test('parseNetUse', () => { - // Result on a computer with english locale: - expect( - parseNetUse(`New connections will be remembered. - - -Status Local Remote Network - -------------------------------------------------------------------------------- -OK Z: \\\\localhost\\media Microsoft Windows Network -The command completed successfully. - `) - ).toEqual([ - { - status: 'OK', - statusOK: true, - local: 'Z', - remote: '\\\\localhost\\media', - network: 'Microsoft Windows Network', - }, - ]) - - // Result on a computer with a norwegian locale: - expect( - parseNetUse(`Nye tilkoblinger vil bli lagret. - - -Status Lokalt Eksternt Nettverk - -------------------------------------------------------------------------------- -Ikke tilgjen U: \\\\caspar01\\mamMediaScanner - Microsoft Windows Network -Ikke tilgjen V: \\nas.nrk\\Prod\\System\\mam - Microsoft Windows Network -OK Z: \\\\aspar01\\mamMediaScanner - Microsoft Windows Network -Kommandoen er fullført. - `) - ).toEqual([ - { - status: 'Ikke tilgjen', - statusOK: false, - local: 'U', - remote: '\\\\caspar01\\mamMediaScanner', - network: 'Microsoft Windows Network', - }, - { - status: 'Ikke tilgjen', - statusOK: false, - local: 'V', - remote: '\\nas.nrk\\Prod\\System\\mam', - network: 'Microsoft Windows Network', - }, - { - status: 'OK', - statusOK: true, - local: 'Z', - remote: '\\\\aspar01\\mamMediaScanner', - network: 'Microsoft Windows Network', - }, - ]) - }) -}) -export {} From d0d6b60b0e1725776ee0d4a8b9b3f2f073d7ead0 Mon Sep 17 00:00:00 2001 From: Johan Nyman Date: Tue, 13 Feb 2024 11:54:34 +0100 Subject: [PATCH 3/5] fix: batch writes to json file, to avoid timeouts when scheduling many writes at the same time. --- .../accessorHandlers/lib/FileHandler.ts | 92 ++----- .../lib/__tests__/json-write-file.spec.ts | 168 ++++++++++++ .../accessorHandlers/lib/json-write-file.ts | 252 ++++++++++++++++++ 3 files changed, 437 insertions(+), 75 deletions(-) create mode 100644 shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts create mode 100644 shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts index 1bbbc3f9..1ebf3491 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts @@ -1,8 +1,6 @@ import path from 'path' import { promisify } from 'util' import fs from 'fs' -import * as LockFile from 'proper-lockfile' -import _ from 'underscore' import { ExpectedPackage, StatusCode, @@ -23,6 +21,7 @@ import { GenericAccessorHandle } from '../genericHandle' import { MonitorInProgress } from '../../lib/monitorInProgress' import { removeBasePath } from './pathJoin' import { FileEvent, FileWatcher, IFileWatcher } from './FileWatcher' +import { updateJSONFileBatch } from './json-write-file' export const LocalFolderAccessorHandleType = 'localFolder' export const FileShareAccessorHandleType = 'fileShare' @@ -32,7 +31,6 @@ const fsReadFile = promisify(fs.readFile) const fsReaddir = promisify(fs.readdir) const fsRmDir = promisify(fs.rmdir) const fsStat = promisify(fs.stat) -const fsWriteFile = promisify(fs.writeFile) const fsUnlink = promisify(fs.unlink) const fsLstat = promisify(fs.lstat) @@ -423,73 +421,23 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso const LOCK_ATTEMPTS_COUNT = 10 const RETRY_TIMEOUT = 100 // ms - let lockCompromisedError: Error | null = null - - // Retry up to 10 times at locking and writing the file: - for (let i = 0; i < LOCK_ATTEMPTS_COUNT; i++) { - lockCompromisedError = null - - // Get file lock - let releaseLock: (() => Promise) | undefined = undefined - try { - releaseLock = await LockFile.lock(this.deferRemovePackagesPath, { - onCompromised: (err) => { - // This is called if the lock somehow gets compromised - this.worker.logger.warn(`updatePackagesToRemove: Lock compromised: ${err}`) - lockCompromisedError = err - }, - }) - } catch (e) { - if (e instanceof Error && (e as any).code === 'ENOENT') { - // The file does not exist. Create an empty file and try again: - await fsWriteFile(this.deferRemovePackagesPath, '') - continue - } else if (e instanceof Error && (e as any).code === 'ELOCKED') { - // Already locked, try again later: - await sleep(RETRY_TIMEOUT) - continue - } else { - // Unknown error. Log and exit: - this.worker.logger.error(e) - return - } - } - // At this point, we have acquired the lock. - try { - // Read and write to the file: - const oldList = await this.getPackagesToRemove() - const newList = cbManipulateList(clone(oldList)) - if (!_.isEqual(oldList, newList)) { - if (lockCompromisedError) { - // The lock was compromised. Try again: - continue - } - await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(newList)) - } - - // Release the lock: - if (!lockCompromisedError) await releaseLock() - // Done, exit the function: - return - } catch (e) { - if (e instanceof Error && (e as any).code === 'ERELEASED') { - // Lock was already released. Something must have gone wrong (eg. someone deleted a folder), - // Log and try again: - this.worker.logger.warn(`updatePackagesToRemove: Lock was already released`) - continue - } else { - // Release the lock: - if (!lockCompromisedError) await releaseLock() - throw e + try { + await updateJSONFileBatch( + this.deferRemovePackagesPath, + (list) => { + return cbManipulateList(list ?? []) + }, + { + retryCount: LOCK_ATTEMPTS_COUNT, + retryTimeout: RETRY_TIMEOUT, + logError: (error) => this.worker.logger.error(error), + logWarning: (message) => this.worker.logger.warn(message), } - } - } - // At this point, the lock failed - this.worker.logger.error( - `updatePackagesToRemove: Failed to lock file "${this.deferRemovePackagesPath}" after ${LOCK_ATTEMPTS_COUNT} attempts` - ) - if (lockCompromisedError) { - this.worker.logger.error(`updatePackagesToRemove: lockCompromisedError: ${lockCompromisedError}`) + ) + } catch (e) { + // Not much we can do about it.. + // Log and continue: + this.worker.logger.error(e) } } } @@ -507,9 +455,3 @@ enum StatusCategory { WATCHER = 'watcher', FILE = 'file_', } -function clone(o: T): T { - return JSON.parse(JSON.stringify(o)) -} -async function sleep(duration: number): Promise { - return new Promise((r) => setTimeout(r, duration)) -} diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts new file mode 100644 index 00000000..5dd37629 --- /dev/null +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts @@ -0,0 +1,168 @@ +import { getTmpPath, updateJSONFile, updateJSONFileBatch } from '../json-write-file' +import { promises as fs } from 'fs' + +const FILE_A = 'file_a.json' +async function cleanup() { + await Promise.all([unlinkIfExists(FILE_A), unlinkIfExists(getLockPath(FILE_A)), unlinkIfExists(getTmpPath(FILE_A))]) +} + +beforeEach(cleanup) +afterEach(cleanup) + +test('updateJSONFile: single write', async () => { + const cbManipulate = jest.fn(() => { + return { + a: 1, + } + }) + await updateJSONFile(FILE_A, cbManipulate) + + expect(cbManipulate).toBeCalledTimes(1) + expect(await readIfExists(FILE_A)).toBe( + JSON.stringify({ + a: 1, + }) + ) +}) + +test('updateJSONFile: 2 writes', async () => { + const cbManipulate = jest.fn((o) => { + o = o || [] + o.push('a') + return o + }) + + const p0 = updateJSONFile(FILE_A, cbManipulate) + await sleep(5) + + const p1 = updateJSONFile(FILE_A, cbManipulate) + + await Promise.all([p0, p1]) + + expect(cbManipulate).toBeCalledTimes(2) + expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a'])) +}) +test('updateJSONFile: 10 writes', async () => { + const cbManipulate = jest.fn((o) => { + o = o || [] + o.push('b') + return o + }) + + const config = { + retryTimeout: 30, + retryCount: 3, + } + + // This should be an impossible tasks, because there will be too many locks, and not enough time to resolve them: + + let error: any + try { + await Promise.all([ + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + updateJSONFile(FILE_A, cbManipulate, config), + ]) + } catch (e) { + error = e + } + expect(error + '').toMatch(/Failed to lock file/) + + // Wait for the lock functions to finish retrying: + await sleep(config.retryTimeout * config.retryCount) +}) + +test('updateJSONFileBatch: single write', async () => { + const cbManipulate = jest.fn(() => { + return { + b: 1, + } + }) + await updateJSONFileBatch(FILE_A, cbManipulate) + + expect(cbManipulate).toBeCalledTimes(1) + expect(await readIfExists(FILE_A)).toBe( + JSON.stringify({ + b: 1, + }) + ) +}) + +test('updateJSONFileBatch: 3 writes', async () => { + const v = await readIfExists(FILE_A) + expect(v).toBe(undefined) + + const cbManipulate = jest.fn((o) => { + o = o || [] + o.push('a') + return o + }) + + const p0 = updateJSONFileBatch(FILE_A, cbManipulate) + await sleep(5) + + const p1 = updateJSONFileBatch(FILE_A, cbManipulate) + const p2 = updateJSONFileBatch(FILE_A, cbManipulate) + + await Promise.all([p0, p1, p2]) + + expect(cbManipulate).toBeCalledTimes(3) + expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a', 'a'])) +}) +test('updateJSONFileBatch: 20 writes', async () => { + const cbManipulate = jest.fn((o) => { + o = o || [] + o.push('a') + return o + }) + + const config = { + retryTimeout: 30, + retryCount: 3, + } + + const ps: Promise[] = [] + let expectResult: string[] = [] + for (let i = 0; i < 20; i++) { + ps.push(updateJSONFileBatch(FILE_A, cbManipulate, config)) + expectResult.push('a') + } + + await Promise.all(ps) + + expect(cbManipulate).toBeCalledTimes(20) + expect(await readIfExists(FILE_A)).toBe(JSON.stringify(expectResult)) +}) + +async function readIfExists(filePath: string): Promise { + try { + return await fs.readFile(filePath, 'utf-8') + } catch (e) { + if ((e as any)?.code === 'ENOENT') { + // not found + return undefined + } else throw e + } +} +async function unlinkIfExists(filePath: string): Promise { + try { + await fs.unlink(filePath) + } catch (e) { + if ((e as any)?.code === 'ENOENT') { + // not found, that's okay + } else throw e + } +} +function getLockPath(filePath: string): string { + return filePath + '.lock' +} +function sleep(duration: number): Promise { + return new Promise((r) => setTimeout(r, duration)) +} diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts new file mode 100644 index 00000000..6ef7b1c9 --- /dev/null +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts @@ -0,0 +1,252 @@ +import { promises as fs } from 'fs' +import * as LockFile from 'proper-lockfile' + +/** Like updateJSONFile() but allow for multiple manipulations to be batched together and executed sequentially */ +export async function updateJSONFileBatch( + filePath: string, + /** + * Callback to modify the JSON value. + * @returns The value to write to the file (or undefined to remove the file) + */ + cbManipulate: (oldValue: T | undefined) => T | undefined, + config?: UpdateJSONFileConfig +): Promise { + // Add manipulator callback to queue: + + const existingBatches = updateJSONFileBatches.get(filePath) + let batches: BatchOperation[] + if (existingBatches) { + batches = existingBatches + } else { + batches = [] + updateJSONFileBatches.set(filePath, batches) + } + + // Find a batch operation that is open to accept new callbacks: + const openBatch = batches.find((batch) => batch.open) + + if (!openBatch) { + // Start a new batch: + + const newBatch: BatchOperation = { + open: true, + callbacks: [cbManipulate], + promise: updateJSONFile( + filePath, + (oldValue: T | undefined) => { + // At this point, we close the batch, so no more callbacks can be added: + newBatch.open = false + + // Execute all callbacks in the batch: + let value = oldValue + for (cbManipulate of newBatch.callbacks) { + value = cbManipulate(value) + } + return value + }, + config + ), + } + batches.push(newBatch) + + let caughtError: any = undefined + try { + await newBatch.promise + } catch (e) { + caughtError = e + } + // After finished executing, remove the batch: + const i = batches.indexOf(newBatch) + if (i === -1) throw new Error('Internal Error: Batch not found') + batches.splice(i, 1) + + if (caughtError) throw caughtError + } else { + // There is a batch open for new callbacks. Add the callback to the batch: + openBatch.callbacks.push(cbManipulate) + await openBatch.promise + } +} + +const updateJSONFileBatches = new Map() +interface BatchOperation { + /** When true, new callbacks can be added */ + open: boolean + /** Resolves when the batch operation has finished */ + promise: Promise + callbacks: ((oldValue: any | undefined) => any | undefined)[] +} + +/** + * Read a JSON file, created by updateJSONFile() + */ +export async function readJSONFile(filePath: string): Promise< + | { + str: string + value: any + } + | undefined +> { + { + const str = await readIfExists(filePath) + if (str !== undefined) { + return { str, value: str ? JSON.parse(str) : undefined } + } + } + + // Second try; Check if there is a temporary file, to use instead? + { + const tmpPath = getTmpPath(filePath) + + const str = await readIfExists(tmpPath) + if (str !== undefined) { + return { str, value: str ? JSON.parse(str) : undefined } + } + } + + return undefined +} + +/** + * A "safe" way to write JSON data to a file. Takes measures to avoid writing corrupt data to a file due to + * 1. Multiple process writing to the same file (uses a lock file) + * 2. Writing corrupt files due to process exit (write to temporary file and rename) + */ +export async function updateJSONFile( + filePath: string, + /** + * Callback to modify the JSON value. + * @returns The value to write to the file (or undefined to remove the file) + */ + cbManipulate: (oldValue: T | undefined) => T | undefined, + config?: UpdateJSONFileConfig +): Promise { + const RETRY_TIMEOUT = config?.retryTimeout ?? 100 + const RETRY_COUNT = config?.retryCount ?? 10 + const logWarning: (message: string) => void = + // eslint-disable-next-line no-console + config?.logWarning ?? ((e) => console.log('Warning in updateJSONFile', e)) + // eslint-disable-next-line no-console + const logError: (message: any) => void = config?.logError ?? ((e) => console.log('Error in updateJSONFile', e)) + + const tmpFilePath = getTmpPath(filePath) + let lockCompromisedError: Error | undefined = undefined + + // Retry up to 10 times at locking and writing the file: + for (let i = 0; i < RETRY_COUNT; i++) { + lockCompromisedError = undefined + + // Get file lock + let releaseLock: (() => Promise) | undefined = undefined + try { + releaseLock = await LockFile.lock(filePath, { + onCompromised: (err) => { + // This is called if the lock somehow gets compromised + + logWarning(`Lock compromised: ${err}`) + lockCompromisedError = err + }, + }) + } catch (e) { + if ((e as any).code === 'ENOENT') { + // The file does not exist. Create an empty file and try again: + + await fs.writeFile(filePath, '') + continue + } else if ((e as any).code === 'ELOCKED') { + // Already locked, try again later: + await sleep(RETRY_TIMEOUT) + continue + } else { + // Unknown error. + throw e + } + } + + // At this point, we have acquired the lock. + try { + // Read and write to the file: + const oldValue = await readJSONFile(filePath) + + const newValue = cbManipulate(oldValue?.value) + const newValueStr = newValue !== undefined ? JSON.stringify(newValue) : '' + + if (oldValue?.str === newValueStr) { + // do nothing + } else { + if (lockCompromisedError) { + // The lock was compromised. Try again: + continue + } + + // Write to a temporary file first, to avoid corrupting the file in case of a process exit: + await fs.writeFile(tmpFilePath, newValueStr) + + // Rename file: + + await rename(tmpFilePath, filePath) + } + + // Release the lock: + if (!lockCompromisedError) await releaseLock() + // Done, exit the function: + return + } catch (e) { + if ((e as any).code === 'ERELEASED') { + // Lock was already released. Something must have gone wrong (eg. someone deleted a folder), + // Log and try again: + logWarning(`Lock was already released`) + continue + } else { + // Release the lock: + if (!lockCompromisedError) await releaseLock() + throw e + } + } + } + // At this point, the lock failed + + if (lockCompromisedError) { + logError(`lockCompromisedError: ${lockCompromisedError}`) + } + throw new Error(`Failed to lock file "${filePath}" after ${RETRY_COUNT} attempts`) +} + +interface UpdateJSONFileConfig { + /** How long to wait a before trying again, in case of a failed write lock. Defaults to 100 ms. */ + retryTimeout?: number + /** How many times to wait a before trying again, in case of a failed write lock. Defaults to 10. */ + retryCount?: number + + logWarning?: (message: string) => void + logError?: (message: any) => void +} + +async function sleep(duration: number): Promise { + return new Promise((r) => setTimeout(r, duration)) +} +async function readIfExists(filePath: string): Promise { + try { + return await fs.readFile(filePath, 'utf-8') + } catch (e) { + if ((e as any)?.code === 'ENOENT') { + // not found + return undefined + } else throw e + } +} +async function rename(from: string, to: string): Promise { + try { + await fs.rename(from, to) + } catch (e) { + if ((e as any)?.code === 'EPERM') { + // Permission denied, wait a little bit and try again: + await sleep(10) + + await fs.rename(from, to) + } else throw e + } +} +export function getTmpPath(filePath: string): string { + return filePath + '.tmp' +} From 797e9586a61eeac37c0d7bf3522017c4c9b1a3c2 Mon Sep 17 00:00:00 2001 From: Johan Nyman Date: Thu, 15 Feb 2024 12:24:49 +0100 Subject: [PATCH 4/5] chore: fix test --- .../src/__mocks__/proper-lockfile.ts | 33 +++++++++++++++++++ .../src/__tests__/basic.spec.ts | 1 + .../src/__tests__/issues.spec.ts | 1 + 3 files changed, 35 insertions(+) create mode 100644 tests/internal-tests/src/__mocks__/proper-lockfile.ts diff --git a/tests/internal-tests/src/__mocks__/proper-lockfile.ts b/tests/internal-tests/src/__mocks__/proper-lockfile.ts new file mode 100644 index 00000000..9394f285 --- /dev/null +++ b/tests/internal-tests/src/__mocks__/proper-lockfile.ts @@ -0,0 +1,33 @@ +const locks = new Set() +export async function lock( + filePath: string, + _options: { + onCompromised: (err: Error) => void + } +): Promise<() => Promise> { + await sleep(1) + + if (locks.has(filePath)) { + const err = new Error('ELOCKED: File is already locked') + ;(err as any).code = 'ELOCKED' + throw err + } else { + locks.add(filePath) + } + + return async () => { + // release lock + + if (!locks.has(filePath)) { + const err = new Error('ELOCKED: File is already released') + ;(err as any).code = 'ERELEASED' + throw err + } else { + locks.delete(filePath) + } + } +} + +async function sleep(duration: number): Promise { + return new Promise((r) => setTimeout(r, duration)) +} diff --git a/tests/internal-tests/src/__tests__/basic.spec.ts b/tests/internal-tests/src/__tests__/basic.spec.ts index aebf9729..eb995fe0 100644 --- a/tests/internal-tests/src/__tests__/basic.spec.ts +++ b/tests/internal-tests/src/__tests__/basic.spec.ts @@ -24,6 +24,7 @@ jest.mock('child_process') jest.mock('windows-network-drive') jest.mock('tv-automation-quantel-gateway-client') jest.mock('@parcel/watcher') +jest.mock('proper-lockfile') const fs = fsOrg as any as typeof fsMockType const WND = WNDOrg as any as typeof WNDType diff --git a/tests/internal-tests/src/__tests__/issues.spec.ts b/tests/internal-tests/src/__tests__/issues.spec.ts index 369dc554..fc60735b 100644 --- a/tests/internal-tests/src/__tests__/issues.spec.ts +++ b/tests/internal-tests/src/__tests__/issues.spec.ts @@ -14,6 +14,7 @@ jest.mock('child_process') jest.mock('windows-network-drive') jest.mock('tv-automation-quantel-gateway-client') jest.mock('@parcel/watcher') +jest.mock('proper-lockfile') const fs = fsOrg as any as typeof fsMockType From 190a3566f845a0986f38125ff1d3e4057c95c1f8 Mon Sep 17 00:00:00 2001 From: Johan Nyman Date: Fri, 16 Feb 2024 15:31:07 +0100 Subject: [PATCH 5/5] chore: refactor & fixes, co-authored by @julusian --- .../lib/__tests__/json-write-file.spec.ts | 44 ++++++++++-- .../accessorHandlers/lib/json-write-file.ts | 67 +++++++++---------- 2 files changed, 68 insertions(+), 43 deletions(-) diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts index 5dd37629..275380e3 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/__tests__/json-write-file.spec.ts @@ -1,11 +1,22 @@ import { getTmpPath, updateJSONFile, updateJSONFileBatch } from '../json-write-file' import { promises as fs } from 'fs' +const logWarning = jest.fn((message: string) => console.log('WARNING', message)) +const logError = jest.fn((message: any) => console.log('ERROR', message)) + const FILE_A = 'file_a.json' async function cleanup() { + logWarning.mockClear() + logError.mockClear() + await Promise.all([unlinkIfExists(FILE_A), unlinkIfExists(getLockPath(FILE_A)), unlinkIfExists(getTmpPath(FILE_A))]) } +const config = { + logError, + logWarning, +} + beforeEach(cleanup) afterEach(cleanup) @@ -15,7 +26,7 @@ test('updateJSONFile: single write', async () => { a: 1, } }) - await updateJSONFile(FILE_A, cbManipulate) + await updateJSONFile(FILE_A, cbManipulate, config) expect(cbManipulate).toBeCalledTimes(1) expect(await readIfExists(FILE_A)).toBe( @@ -23,6 +34,8 @@ test('updateJSONFile: single write', async () => { a: 1, }) ) + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) test('updateJSONFile: 2 writes', async () => { @@ -32,15 +45,17 @@ test('updateJSONFile: 2 writes', async () => { return o }) - const p0 = updateJSONFile(FILE_A, cbManipulate) + const p0 = updateJSONFile(FILE_A, cbManipulate, config) await sleep(5) - const p1 = updateJSONFile(FILE_A, cbManipulate) + const p1 = updateJSONFile(FILE_A, cbManipulate, config) await Promise.all([p0, p1]) expect(cbManipulate).toBeCalledTimes(2) expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a'])) + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) test('updateJSONFile: 10 writes', async () => { const cbManipulate = jest.fn((o) => { @@ -50,6 +65,8 @@ test('updateJSONFile: 10 writes', async () => { }) const config = { + logError, + logWarning, retryTimeout: 30, retryCount: 3, } @@ -77,6 +94,9 @@ test('updateJSONFile: 10 writes', async () => { // Wait for the lock functions to finish retrying: await sleep(config.retryTimeout * config.retryCount) + + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) test('updateJSONFileBatch: single write', async () => { @@ -85,7 +105,7 @@ test('updateJSONFileBatch: single write', async () => { b: 1, } }) - await updateJSONFileBatch(FILE_A, cbManipulate) + await updateJSONFileBatch(FILE_A, cbManipulate, config) expect(cbManipulate).toBeCalledTimes(1) expect(await readIfExists(FILE_A)).toBe( @@ -93,6 +113,8 @@ test('updateJSONFileBatch: single write', async () => { b: 1, }) ) + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) test('updateJSONFileBatch: 3 writes', async () => { @@ -105,16 +127,19 @@ test('updateJSONFileBatch: 3 writes', async () => { return o }) - const p0 = updateJSONFileBatch(FILE_A, cbManipulate) + const p0 = updateJSONFileBatch(FILE_A, cbManipulate, config) await sleep(5) - const p1 = updateJSONFileBatch(FILE_A, cbManipulate) - const p2 = updateJSONFileBatch(FILE_A, cbManipulate) + const p1 = updateJSONFileBatch(FILE_A, cbManipulate, config) + const p2 = updateJSONFileBatch(FILE_A, cbManipulate, config) await Promise.all([p0, p1, p2]) expect(cbManipulate).toBeCalledTimes(3) expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a', 'a'])) + + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) test('updateJSONFileBatch: 20 writes', async () => { const cbManipulate = jest.fn((o) => { @@ -124,6 +149,8 @@ test('updateJSONFileBatch: 20 writes', async () => { }) const config = { + logWarning, + logError, retryTimeout: 30, retryCount: 3, } @@ -139,6 +166,9 @@ test('updateJSONFileBatch: 20 writes', async () => { expect(cbManipulate).toBeCalledTimes(20) expect(await readIfExists(FILE_A)).toBe(JSON.stringify(expectResult)) + + expect(logWarning).toBeCalledTimes(0) + expect(logError).toBeCalledTimes(0) }) async function readIfExists(filePath: string): Promise { diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts index 6ef7b1c9..5a13fa05 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts @@ -13,29 +13,22 @@ export async function updateJSONFileBatch( ): Promise { // Add manipulator callback to queue: - const existingBatches = updateJSONFileBatches.get(filePath) - let batches: BatchOperation[] - if (existingBatches) { - batches = existingBatches - } else { - batches = [] - updateJSONFileBatches.set(filePath, batches) - } - - // Find a batch operation that is open to accept new callbacks: - const openBatch = batches.find((batch) => batch.open) + const openBatch = updateJSONFileBatches.get(filePath) if (!openBatch) { // Start a new batch: const newBatch: BatchOperation = { - open: true, callbacks: [cbManipulate], promise: updateJSONFile( filePath, (oldValue: T | undefined) => { - // At this point, we close the batch, so no more callbacks can be added: - newBatch.open = false + // At this point, no more callbacks can be added, so we close the batch: + + // Guard against this being called multiple times: + if (updateJSONFileBatches.get(filePath) === newBatch) { + updateJSONFileBatches.delete(filePath) + } // Execute all callbacks in the batch: let value = oldValue @@ -47,20 +40,9 @@ export async function updateJSONFileBatch( config ), } - batches.push(newBatch) + updateJSONFileBatches.set(filePath, newBatch) - let caughtError: any = undefined - try { - await newBatch.promise - } catch (e) { - caughtError = e - } - // After finished executing, remove the batch: - const i = batches.indexOf(newBatch) - if (i === -1) throw new Error('Internal Error: Batch not found') - batches.splice(i, 1) - - if (caughtError) throw caughtError + await newBatch.promise } else { // There is a batch open for new callbacks. Add the callback to the batch: openBatch.callbacks.push(cbManipulate) @@ -68,10 +50,8 @@ export async function updateJSONFileBatch( } } -const updateJSONFileBatches = new Map() +const updateJSONFileBatches = new Map() interface BatchOperation { - /** When true, new callbacks can be added */ - open: boolean /** Resolves when the batch operation has finished */ promise: Promise callbacks: ((oldValue: any | undefined) => any | undefined)[] @@ -80,28 +60,41 @@ interface BatchOperation { /** * Read a JSON file, created by updateJSONFile() */ -export async function readJSONFile(filePath: string): Promise< +export async function readJSONFile( + filePath: string, + logError?: (message: any) => void +): Promise< | { str: string value: any } | undefined > { - { + // eslint-disable-next-line no-console + logError = logError ?? console.error + + try { const str = await readIfExists(filePath) if (str !== undefined) { return { str, value: str ? JSON.parse(str) : undefined } } + } catch (e) { + // file data is corrupt, log and continue + logError(e) } // Second try; Check if there is a temporary file, to use instead? - { + try { const tmpPath = getTmpPath(filePath) const str = await readIfExists(tmpPath) if (str !== undefined) { return { str, value: str ? JSON.parse(str) : undefined } } + } catch (e) { + logError(e) + // file data is corrupt, return undefined then + return undefined } return undefined @@ -109,7 +102,7 @@ export async function readJSONFile(filePath: string): Promise< /** * A "safe" way to write JSON data to a file. Takes measures to avoid writing corrupt data to a file due to - * 1. Multiple process writing to the same file (uses a lock file) + * 1. Multiple processes writing to the same file (uses a lock file) * 2. Writing corrupt files due to process exit (write to temporary file and rename) */ export async function updateJSONFile( @@ -166,7 +159,7 @@ export async function updateJSONFile( // At this point, we have acquired the lock. try { // Read and write to the file: - const oldValue = await readJSONFile(filePath) + const oldValue = await readJSONFile(filePath, logError) const newValue = cbManipulate(oldValue?.value) const newValueStr = newValue !== undefined ? JSON.stringify(newValue) : '' @@ -179,11 +172,13 @@ export async function updateJSONFile( continue } + // Note: We can't unlink the file anywhere in here, or other calls to Lockfile can break + // by overwriting the file with an empty one. + // Write to a temporary file first, to avoid corrupting the file in case of a process exit: await fs.writeFile(tmpFilePath, newValueStr) // Rename file: - await rename(tmpFilePath, filePath) }