diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index f0e7d7cf0d362..1abd5e37000fb 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -117,6 +117,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final AtomicLong metadataUploadCounter = new AtomicLong(0); + private Boolean useSameThreadPool; + public static final int METADATA_FILES_TO_FETCH = 10; public RemoteSegmentStoreDirectory( @@ -133,6 +135,19 @@ public RemoteSegmentStoreDirectory( this.threadPool = threadPool; this.logger = Loggers.getLogger(getClass(), shardId); init(); + useSameThreadPool = false; + } + + RemoteSegmentStoreDirectory( + RemoteDirectory remoteDataDirectory, + RemoteDirectory remoteMetadataDirectory, + RemoteStoreLockManager mdLockManager, + ThreadPool threadPool, + ShardId shardId, + Boolean useSameThreadPool + ) throws IOException { + this(remoteDataDirectory, remoteMetadataDirectory, mdLockManager, threadPool, shardId); + this.useSameThreadPool = useSameThreadPool; } /** @@ -871,7 +886,7 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener) { if (canDeleteStaleCommits.compareAndSet(true, false)) { try { - threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + threadPool.executor(getExecutorNameForAsyncOperations()).execute(() -> { try { deleteStaleSegments(lastNMetadataFilesToKeep); listener.onResponse(null); @@ -899,15 +914,8 @@ public static void remoteDirectoryCleanup( String indexUUID, ShardId shardId ) { - try ( - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( - remoteStoreRepoForIndex, - indexUUID, - shardId - ) - ) { - remoteDirectory.deleteStaleSegments(0); // sync stale segments cleanup - remoteDirectory.deleteIfEmpty(); + try { + remoteDirectoryFactory.newDirectory(remoteStoreRepoForIndex, indexUUID, shardId, true).close(); } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } @@ -938,6 +946,10 @@ private boolean deleteIfEmpty() throws IOException { return true; } + private String getExecutorNameForAsyncOperations() { + return useSameThreadPool ? ThreadPool.Names.SAME : ThreadPool.Names.REMOTE_PURGE; + } + @Override public void close() throws IOException { deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory"))); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index a5e89ec6a8327..f82489004ca56 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -50,6 +50,11 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw } public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId) throws IOException { + return newDirectory(repositoryName, indexUUID, shardId, false); + } + + // visible for testing + Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId, Boolean useSameThreadPool) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); @@ -71,9 +76,10 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, useSameThreadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } + } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 216fb7db3d67c..4a932d296a82f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -160,9 +160,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -175,7 +173,6 @@ import java.util.stream.LongStream; import java.util.stream.Stream; -import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -240,6 +237,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Deprecated ); + private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class); + /** * Setting to disable caching of the latest repository data. */ @@ -1107,9 +1106,10 @@ public static void remoteDirectoryCleanupAsync( ThreadPool threadpool, String remoteStoreRepoForIndex, String indexUUID, - ShardId shardId + ShardId shardId, + String threadPoolName ) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE) + threadpool.executor(threadPoolName) .execute( new RemoteStoreShardCleanupTask( () -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup( @@ -1118,58 +1118,12 @@ public static void remoteDirectoryCleanupAsync( indexUUID, shardId ), - threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId ) ); } - /** - A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. - */ - public static class RemoteStoreShardCleanupTask implements Runnable { - private final Runnable task; - private final ExecutorService executor; - private final String shardIdentifier; - final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); - final static ConcurrentMap> shardCleanupPendingTasks = new ConcurrentHashMap<>(); - - public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) { - this.task = task; - this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); - this.executor = executor; - } - - private static String indexShardIdentifier(String indexUUID, ShardId shardId) { - return String.join("/", indexUUID, String.valueOf(shardId.id())); - } - - @Override - public void run() { - if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { - try { - task.run(); - BlockingQueue pendingTasks = shardCleanupPendingTasks.get(shardIdentifier); - try { - if (pendingTasks != null) { - for (Runnable pendingTask = pendingTasks.poll(0L, TimeUnit.MILLISECONDS); pendingTask != null;) { - pendingTask.run(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } finally { - ongoingRemoteDirectoryCleanups.remove(shardIdentifier); - } - } else { - shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>()); - shardCleanupPendingTasks.get(shardIdentifier).add(task); - } - } - } - protected void releaseRemoteStoreLockAndCleanup( String shardId, String shallowSnapshotUUID, @@ -1213,7 +1167,8 @@ protected void releaseRemoteStoreLockAndCleanup( threadPool, remoteStoreRepoForIndex, indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)) + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), + ThreadPool.Names.REMOTE_PURGE ); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java new file mode 100644 index 0000000000000..a0aba25efb7d5 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; + +import java.util.Set; + +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; + +/** + A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. + */ +public class RemoteStoreShardCleanupTask implements Runnable { + private final Runnable task; + private final String shardIdentifier; + final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); + final static Set pendingRemoteDirectoryCleanups = newConcurrentSet(); + private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class); + + public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) { + this.task = task; + this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); + } + + private static String indexShardIdentifier(String indexUUID, ShardId shardId) { + return String.join("/", indexUUID, String.valueOf(shardId.id())); + } + + @Override + public void run() { + if (pendingRemoteDirectoryCleanups.add(shardIdentifier)) { + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + while (pendingRemoteDirectoryCleanups.contains(shardIdentifier)) { + ongoingRemoteDirectoryCleanups.add(shardIdentifier); + pendingRemoteDirectoryCleanups.remove(shardIdentifier); + task.run(); + ongoingRemoteDirectoryCleanups.remove(shardIdentifier); + } + } else { + staticLogger.debug("one task is already ongoing, we can leave entry in pending"); + } + } else { + staticLogger.debug("one cleanup task for shard is already in pending, we can skip this task"); + } + } +} diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 9c65ad32fa6a6..ce3e8ce76d141 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -42,6 +42,8 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; @@ -64,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -318,4 +321,36 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo return repoData; } + public void testRemoteStoreShardCleanupTask() { + // todo: move it to separate class and add more scenarios. + AtomicBoolean executed = new AtomicBoolean(false); + Runnable task = () -> executed.set(true); + String indexName = "test-idx"; + String testIndexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0); + + // Scenario 1: pending =false, ongoing = false => executed + RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertTrue(executed.get()); + + // Scenario2: pending = false, ongoing = true => pending = true + executed.set(false); + String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id())); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed.get()); + assertTrue(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.contains(shardIdentifier)); + + // Scenario3: pending = true, ongoing = true => do nothing + RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.add(shardIdentifier); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed.get()); + } + }