From d0467b3abd911fa9c12fdc84900d726e0db3aa99 Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Thu, 14 Mar 2024 17:43:27 -0700 Subject: [PATCH] =?UTF-8?q?Simplify=20remote=20directory=20cleanup=20after?= =?UTF-8?q?=20snapshot=20delete=20to=20=E2=80=A6=20(#12672)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Simplify remote directory cleanup after snapshot delete to avoid concurrent cleanup task runs for same shard. Signed-off-by: Harish Bhakuni * Address PR Comments. Signed-off-by: Harish Bhakuni --------- Signed-off-by: Harish Bhakuni Co-authored-by: Harish Bhakuni --- .../blobstore/BlobStoreRepository.java | 11 +++---- .../RemoteStoreShardCleanupTask.java | 26 +++++------------ .../blobstore/BlobStoreRepositoryTests.java | 29 +++++++------------ 3 files changed, 24 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 4a932d296a82f..8dc370f4c0d32 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1153,11 +1153,12 @@ protected void releaseRemoteStoreLockAndCleanup( logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID); if (!isIndexPresent(clusterService, indexUUID)) { // Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if - // index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE - // threadpool. if it fails, it could leave some stale files in remote directory. this issue could - // even happen in cases of shard level remote store data cleanup which also happens asynchronously. - // in long term, we have plans to implement remote store GC poller mechanism which will take care of - // such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 + // index is already deleted. this is the best effort at the moment since shard cleanup will still happen + // asynchronously using REMOTE_PURGE thread pool. if it fails, it could leave some stale files in remote + // directory. this issue could even happen in cases of shard level remote store data cleanup which also + // happens asynchronously. in long term, we have plans to implement remote store GC poller mechanism which + // will take care of such stale data. + // related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), threadPool diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java index df61c1ca3263b..b6b8957d1bd19 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java @@ -12,10 +12,8 @@ import org.apache.logging.log4j.Logger; import org.opensearch.core.index.shard.ShardId; -import java.util.Map; import java.util.Set; -import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; /** @@ -25,7 +23,6 @@ public class RemoteStoreShardCleanupTask implements Runnable { private final Runnable task; private final String shardIdentifier; final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); - final static Map pendingRemoteDirectoryCleanups = newConcurrentMap(); private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class); public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) { @@ -39,25 +36,16 @@ private static String indexShardIdentifier(String indexUUID, ShardId shardId) { @Override public void run() { - // TODO: this is the best effort at the moment since there is still a known race condition scenario in this - // method which needs to be handled where one of the thread just came out of while loop and removed the - // entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map. - // we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object - // cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time. - // - if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) { - if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { - while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) { - Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier); - pendingRemoteDirectoryCleanups.remove(shardIdentifier); - newTask.run(); - } + // If there is already a same task ongoing for a shard, we need to skip the new task to avoid multiple + // concurrent cleanup of same shard. + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + try { + task.run(); + } finally { ongoingRemoteDirectoryCleanups.remove(shardIdentifier); - } else { - staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier); } } else { - staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier); + staticLogger.warn("one cleanup task for shard {} is already ongoing, need to skip this task", shardIdentifier); } } } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index b76e01d6d4c82..2445cad01574c 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -321,38 +321,31 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo return repoData; } + private String getShardIdentifier(String indexUUID, String shardId) { + return String.join("/", indexUUID, shardId); + } + public void testRemoteStoreShardCleanupTask() { - // todo: move it to separate class and add more scenarios. AtomicBoolean executed1 = new AtomicBoolean(false); Runnable task1 = () -> executed1.set(true); String indexName = "test-idx"; String testIndexUUID = "test-idx-uuid"; ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0); - // Scenario 1: pending = empty, ongoing = false => executed + // just adding random shards in ongoing cleanups. + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "1")); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "2")); + + // Scenario 1: ongoing = false => executed RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); remoteStoreShardCleanupTask.run(); assertTrue(executed1.get()); - // Scenario 2: pending = empty, ongoing = true => pending = currentTask + // Scenario 2: ongoing = true => currentTask skipped. executed1.set(false); - String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id())); - RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); - + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "0")); remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); remoteStoreShardCleanupTask.run(); assertFalse(executed1.get()); - assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1); - - // Scenario3: pending = anotherTask, ongoing = true => pending = currentTask - AtomicBoolean executed2 = new AtomicBoolean(false); - Runnable task2 = () -> executed2.set(true); - RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1); - RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); - - remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId); - remoteStoreShardCleanupTask.run(); - assertFalse(executed1.get()); - assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2); } }