Skip to content

Commit

Permalink
Colossus cleanup: Additional safety mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Lezek123 committed Nov 26, 2024
1 parent 0094a61 commit 2520e15
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 36 deletions.
5 changes: 3 additions & 2 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized:
- `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server.
- Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory.
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`)
- Improved logging during cleanup
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`).
- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid.
- Improved logging during cleanup.


### 4.2.0
Expand Down
87 changes: 53 additions & 34 deletions storage-node/src/services/sync/cleanupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,41 +91,60 @@ export async function performCleanup(
const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id)))

logger.info(`stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectIds.size}`)
logger.info(
`pruning ${obsoleteObjectIds.size} obsolete objects ` +
`(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)`
)

const workingStack = new WorkingStack()
const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber)

// Execute deleted objects removal tasks in batches of 10_000
let deletedProcessed = 0
logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`)
for (const deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) {
const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id))
await workingStack.add(deletionTasks)
await processSpawner.process()
deletedProcessed += deletedObjectsIdsBatch.length
logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`)
}

// Execute moved objects removal tasks in batches of 10_000
let movedProcessed = 0
logger.info(`removing ${movedObjectIds.size} moved objects...`)
for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) {
const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch)
const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects(
logger,
uploadDirectory,
model,
movedDataObjectsBatch,
hostId
if (obsoleteObjectIds.size) {
logger.info(
`pruning ${obsoleteObjectIds.size} obsolete objects ` +
`(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)`
)
await workingStack.add(deletionTasksOfMovedDataObjects)
await processSpawner.process()
movedProcessed += movedDataObjectsBatch.length
logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`)

const workingStack = new WorkingStack()
const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber)

// Execute deleted objects removal tasks in batches of 10_000
if (deletedDataObjectIds.size) {
let deletedProcessed = 0
logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`)
for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) {
// Confirm whether the objects were actually deleted by fetching the related deletion events
const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch)
const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId))
deletedObjectsIdsBatch = deletedObjectsIdsBatch.filter((id) => {
if (confirmedIds.has(id)) {
return true
} else {
logger.warn(`Could not find DataObjectDeleted event for object ${id}, skipping from cleanup...`)
return false
}
})
const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id))
await workingStack.add(deletionTasks)
await processSpawner.process()
deletedProcessed += deletedObjectsIdsBatch.length
logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`)
}
}

// Execute moved objects removal tasks in batches of 10_000
if (movedObjectIds.size) {
let movedProcessed = 0
logger.info(`removing ${movedObjectIds.size} moved objects...`)
for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) {
const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch)
const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects(
logger,
uploadDirectory,
model,
movedDataObjectsBatch,
hostId
)
await workingStack.add(deletionTasksOfMovedDataObjects)
await processSpawner.process()
movedProcessed += movedDataObjectsBatch.length
logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`)
}
}
} else {
logger.info('No objects to prune, skipping...')
}

logger.info('Cleanup ended.')
Expand Down

0 comments on commit 2520e15

Please sign in to comment.