Skip to content

Commit

Permalink
Merge pull request #134 from nrkno/fix/delay-files-removal
Browse files Browse the repository at this point in the history
fix: Lock the __removePackages.json file (SOFIE-2949)
  • Loading branch information
nytamin authored Feb 19, 2024
2 parents 2081ca2 + 190a356 commit a69756a
Show file tree
Hide file tree
Showing 9 changed files with 563 additions and 120 deletions.
4 changes: 3 additions & 1 deletion shared/packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
"scripts": {
"build": "yarn rimraf dist && yarn build:main",
"build:main": "tsc -p tsconfig.json",
"__test": "jest"
"test": "jest --runInBand"
},
"engines": {
"node": ">=14.18.0"
},
"devDependencies": {
"@types/deep-diff": "^1.0.0",
"@types/node-fetch": "^2.5.8",
"@types/proper-lockfile": "^4.1.4",
"@types/tmp": "~0.2.2"
},
"dependencies": {
Expand All @@ -26,6 +27,7 @@
"form-data": "^4.0.0",
"mkdirp": "^2.1.3",
"node-fetch": "^2.6.1",
"proper-lockfile": "^4.1.2",
"tmp": "~0.2.1",
"tv-automation-quantel-gateway-client": "3.1.7",
"windows-network-drive": "^4.0.1",
Expand Down
67 changes: 0 additions & 67 deletions shared/packages/worker/src/__tests__/fileShare.spec.ts

This file was deleted.

111 changes: 59 additions & 52 deletions shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { GenericAccessorHandle } from '../genericHandle'
import { MonitorInProgress } from '../../lib/monitorInProgress'
import { removeBasePath } from './pathJoin'
import { FileEvent, FileWatcher, IFileWatcher } from './FileWatcher'
import { updateJSONFileBatch } from './json-write-file'

export const LocalFolderAccessorHandleType = 'localFolder'
export const FileShareAccessorHandleType = 'fileShare'
Expand All @@ -30,7 +31,6 @@ const fsReadFile = promisify(fs.readFile)
const fsReaddir = promisify(fs.readdir)
const fsRmDir = promisify(fs.rmdir)
const fsStat = promisify(fs.stat)
const fsWriteFile = promisify(fs.writeFile)
const fsUnlink = promisify(fs.unlink)
const fsLstat = promisify(fs.lstat)

Expand All @@ -53,48 +53,36 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso

/** Schedule the package for later removal */
async delayPackageRemoval(filePath: string, ttl: number): Promise<void> {
const packagesToRemove = await this.getPackagesToRemove()

// Search for a pre-existing entry:
let found = false
for (const entry of packagesToRemove) {
if (entry.filePath === filePath) {
// extend the TTL if it was found:
entry.removeTime = Date.now() + ttl

found = true
break
await this.updatePackagesToRemove((packagesToRemove) => {
// Search for a pre-existing entry:
let alreadyExists = false
for (const entry of packagesToRemove) {
if (entry.filePath === filePath) {
// extend the TTL if it was found:
entry.removeTime = Date.now() + ttl

alreadyExists = true
break
}
}
}
if (!found) {
packagesToRemove.push({
filePath: filePath,
removeTime: Date.now() + ttl,
})
}

await this.storePackagesToRemove(packagesToRemove)
if (!alreadyExists) {
packagesToRemove.push({
filePath: filePath,
removeTime: Date.now() + ttl,
})
}
return packagesToRemove
})
}
/** Clear a scheduled later removal of a package */
async clearPackageRemoval(filePath: string): Promise<void> {
const packagesToRemove = await this.getPackagesToRemove()

let found = false
for (let i = 0; i < packagesToRemove.length; i++) {
const entry = packagesToRemove[i]
if (entry.filePath === filePath) {
packagesToRemove.splice(i, 1)
found = true
break
}
}
if (found) {
await this.storePackagesToRemove(packagesToRemove)
}
await this.updatePackagesToRemove((packagesToRemove) => {
return packagesToRemove.filter((entry) => entry.filePath !== filePath)
})
}
/** Remove any packages that are due for removal */
async removeDuePackages(): Promise<Reason | null> {
let packagesToRemove = await this.getPackagesToRemove()
const packagesToRemove = await this.getPackagesToRemove()

const removedFilePaths: string[] = []
for (const entry of packagesToRemove) {
Expand All @@ -112,21 +100,14 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
}
}

// Fetch again, to decrease the risk of race-conditions:
packagesToRemove = await this.getPackagesToRemove()
let changed = false
// Remove paths from array:
for (let i = 0; i < packagesToRemove.length; i++) {
const entry = packagesToRemove[i]
if (removedFilePaths.includes(entry.filePath)) {
packagesToRemove.splice(i, 1)
changed = true
break
}
}
if (changed) {
await this.storePackagesToRemove(packagesToRemove)
if (removedFilePaths.length > 0) {
// Update the list of packages to remove:
await this.updatePackagesToRemove((packagesToRemove) => {
// Remove the entries of the files we removed:
return packagesToRemove.filter((entry) => !removedFilePaths.includes(entry.filePath))
})
}

return null
}
/** Unlink (remove) a file, if it exists. Returns true if it did exist */
Expand Down Expand Up @@ -430,8 +411,34 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
}
return packagesToRemove
}
private async storePackagesToRemove(packagesToRemove: DelayPackageRemovalEntry[]): Promise<void> {
await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(packagesToRemove))
/** Update the deferred-remove-packages list */
private async updatePackagesToRemove(
cbManipulateList: (list: DelayPackageRemovalEntry[]) => DelayPackageRemovalEntry[]
): Promise<void> {
// Note: It is high likelihood that several processes will try to write to this file at the same time
// Therefore, we need to lock the file while writing to it.

const LOCK_ATTEMPTS_COUNT = 10
const RETRY_TIMEOUT = 100 // ms

try {
await updateJSONFileBatch<DelayPackageRemovalEntry[]>(
this.deferRemovePackagesPath,
(list) => {
return cbManipulateList(list ?? [])
},
{
retryCount: LOCK_ATTEMPTS_COUNT,
retryTimeout: RETRY_TIMEOUT,
logError: (error) => this.worker.logger.error(error),
logWarning: (message) => this.worker.logger.warn(message),
}
)
} catch (e) {
// Not much we can do about it..
// Log and continue:
this.worker.logger.error(e)
}
}
}

Expand Down
Loading

0 comments on commit a69756a

Please sign in to comment.