From 2520e15017ca4ba5030d93bbf1c1497744900def Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Tue, 26 Nov 2024 10:16:41 +0100 Subject: [PATCH] Colossus cleanup: Additional safety mechanism --- storage-node/CHANGELOG.md | 5 +- .../src/services/sync/cleanupService.ts | 87 +++++++++++-------- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index a3cf96681f..8982fb9db2 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -4,8 +4,9 @@ - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server. - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. - - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`) -- Improved logging during cleanup + - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). +- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. +- Improved logging during cleanup. ### 4.2.0 diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index fdaf4f5f2f..8f1eaa1d40 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -91,41 +91,60 @@ export async function performCleanup( const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id))) logger.info(`stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectIds.size}`) - logger.info( - `pruning ${obsoleteObjectIds.size} obsolete objects ` + - `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` - ) - - const workingStack = new WorkingStack() - const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - - // Execute deleted objects removal tasks in batches of 10_000 - let deletedProcessed = 0 - logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) - for (const deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { - const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) - await workingStack.add(deletionTasks) - await processSpawner.process() - deletedProcessed += deletedObjectsIdsBatch.length - logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) - } - - // Execute moved objects removal tasks in batches of 10_000 - let movedProcessed = 0 - logger.info(`removing ${movedObjectIds.size} moved objects...`) - for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { - const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) - const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( - logger, - uploadDirectory, - model, - movedDataObjectsBatch, - hostId + if (obsoleteObjectIds.size) { + logger.info( + `pruning ${obsoleteObjectIds.size} obsolete objects ` + + `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` ) - await workingStack.add(deletionTasksOfMovedDataObjects) - await processSpawner.process() - movedProcessed += movedDataObjectsBatch.length - logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + + const workingStack = new WorkingStack() + const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) + + // Execute deleted objects removal tasks in batches of 10_000 + if (deletedDataObjectIds.size) { + let deletedProcessed = 0 + logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) + for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { + // Confirm whether the objects were actually deleted by fetching the related deletion events + const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch) + const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId)) + deletedObjectsIdsBatch = deletedObjectsIdsBatch.filter((id) => { + if (confirmedIds.has(id)) { + return true + } else { + logger.warn(`Could not find DataObjectDeleted event for object ${id}, skipping from cleanup...`) + return false + } + }) + const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) + await workingStack.add(deletionTasks) + await processSpawner.process() + deletedProcessed += deletedObjectsIdsBatch.length + logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) + } + } + + // Execute moved objects removal tasks in batches of 10_000 + if (movedObjectIds.size) { + let movedProcessed = 0 + logger.info(`removing ${movedObjectIds.size} moved objects...`) + for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { + const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) + const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( + logger, + uploadDirectory, + model, + movedDataObjectsBatch, + hostId + ) + await workingStack.add(deletionTasksOfMovedDataObjects) + await processSpawner.process() + movedProcessed += movedDataObjectsBatch.length + logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + } + } + } else { + logger.info('No objects to prune, skipping...') } logger.info('Cleanup ended.')