diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index f0e7d7cf0d362..c9a238c6e3350 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -899,15 +899,14 @@ public static void remoteDirectoryCleanup( String indexUUID, ShardId shardId ) { - try ( - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( + try { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( remoteStoreRepoForIndex, indexUUID, shardId - ) - ) { - remoteDirectory.deleteStaleSegments(0); // sync stale segments cleanup - remoteDirectory.deleteIfEmpty(); + ); + remoteSegmentStoreDirectory.deleteStaleSegments(0); + remoteSegmentStoreDirectory.deleteIfEmpty(); } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index a5e89ec6a8327..eca8d9ec702e1 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } + } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 216fb7db3d67c..4a932d296a82f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -160,9 +160,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -175,7 +173,6 @@ import java.util.stream.LongStream; import java.util.stream.Stream; -import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -240,6 +237,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Deprecated ); + private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class); + /** * Setting to disable caching of the latest repository data. */ @@ -1107,9 +1106,10 @@ public static void remoteDirectoryCleanupAsync( ThreadPool threadpool, String remoteStoreRepoForIndex, String indexUUID, - ShardId shardId + ShardId shardId, + String threadPoolName ) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE) + threadpool.executor(threadPoolName) .execute( new RemoteStoreShardCleanupTask( () -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup( @@ -1118,58 +1118,12 @@ public static void remoteDirectoryCleanupAsync( indexUUID, shardId ), - threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId ) ); } - /** - A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. - */ - public static class RemoteStoreShardCleanupTask implements Runnable { - private final Runnable task; - private final ExecutorService executor; - private final String shardIdentifier; - final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); - final static ConcurrentMap> shardCleanupPendingTasks = new ConcurrentHashMap<>(); - - public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) { - this.task = task; - this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); - this.executor = executor; - } - - private static String indexShardIdentifier(String indexUUID, ShardId shardId) { - return String.join("/", indexUUID, String.valueOf(shardId.id())); - } - - @Override - public void run() { - if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { - try { - task.run(); - BlockingQueue pendingTasks = shardCleanupPendingTasks.get(shardIdentifier); - try { - if (pendingTasks != null) { - for (Runnable pendingTask = pendingTasks.poll(0L, TimeUnit.MILLISECONDS); pendingTask != null;) { - pendingTask.run(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } finally { - ongoingRemoteDirectoryCleanups.remove(shardIdentifier); - } - } else { - shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>()); - shardCleanupPendingTasks.get(shardIdentifier).add(task); - } - } - } - protected void releaseRemoteStoreLockAndCleanup( String shardId, String shallowSnapshotUUID, @@ -1213,7 +1167,8 @@ protected void releaseRemoteStoreLockAndCleanup( threadPool, remoteStoreRepoForIndex, indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)) + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), + ThreadPool.Names.REMOTE_PURGE ); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java new file mode 100644 index 0000000000000..df61c1ca3263b --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; + +import java.util.Map; +import java.util.Set; + +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; + +/** + A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. + */ +public class RemoteStoreShardCleanupTask implements Runnable { + private final Runnable task; + private final String shardIdentifier; + final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); + final static Map pendingRemoteDirectoryCleanups = newConcurrentMap(); + private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class); + + public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) { + this.task = task; + this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); + } + + private static String indexShardIdentifier(String indexUUID, ShardId shardId) { + return String.join("/", indexUUID, String.valueOf(shardId.id())); + } + + @Override + public void run() { + // TODO: this is the best effort at the moment since there is still a known race condition scenario in this + // method which needs to be handled where one of the thread just came out of while loop and removed the + // entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map. + // we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object + // cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time. + // + if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) { + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) { + Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier); + pendingRemoteDirectoryCleanups.remove(shardIdentifier); + newTask.run(); + } + ongoingRemoteDirectoryCleanups.remove(shardIdentifier); + } else { + staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier); + } + } else { + staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 9ee6efeb309ae..8b69c15dac9d3 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -34,6 +34,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; @@ -685,6 +687,29 @@ public void onFailure(Exception e) { storeDirectory.close(); } + public void testCleanupAsync() throws Exception { + populateMetadata(); + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory = mock(RemoteSegmentStoreDirectoryFactory.class); + RemoteSegmentStoreDirectory remoteSegmentDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool, + indexShard.shardId() + ); + when(remoteSegmentStoreDirectoryFactory.newDirectory(any(), any(), any())).thenReturn(remoteSegmentDirectory); + String repositoryName = "test-repository"; + String indexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt("0")); + + RemoteSegmentStoreDirectory.remoteDirectoryCleanup(remoteSegmentStoreDirectoryFactory, repositoryName, indexUUID, shardId); + verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId); + verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); + verify(remoteMetadataDirectory).delete(); + verify(remoteDataDirectory).delete(); + verify(mdLockManager).delete(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 9c65ad32fa6a6..b76e01d6d4c82 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -42,6 +42,8 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; @@ -64,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -318,4 +321,38 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo return repoData; } + public void testRemoteStoreShardCleanupTask() { + // todo: move it to separate class and add more scenarios. + AtomicBoolean executed1 = new AtomicBoolean(false); + Runnable task1 = () -> executed1.set(true); + String indexName = "test-idx"; + String testIndexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0); + + // Scenario 1: pending = empty, ongoing = false => executed + RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertTrue(executed1.get()); + + // Scenario 2: pending = empty, ongoing = true => pending = currentTask + executed1.set(false); + String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id())); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1); + + // Scenario3: pending = anotherTask, ongoing = true => pending = currentTask + AtomicBoolean executed2 = new AtomicBoolean(false); + Runnable task2 = () -> executed2.set(true); + RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2); + } }