From 488c81cf97e20bf12d94ba11ebab37a26d223d65 Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Wed, 13 Mar 2024 14:00:03 -0700 Subject: [PATCH] Address PR Comments. Signed-off-by: Harish Bhakuni --- .../store/RemoteSegmentStoreDirectory.java | 23 +++---- .../RemoteSegmentStoreDirectoryFactory.java | 8 ++- .../blobstore/BlobStoreRepository.java | 59 +++--------------- .../RemoteStoreShardCleanupTask.java | 61 +++++++++++++++++++ .../RemoteStoreRefreshListenerTests.java | 3 +- .../RemoteSegmentStoreDirectoryTests.java | 6 +- .../blobstore/BlobStoreRepositoryTests.java | 37 +++++++++++ .../index/shard/IndexShardTestCase.java | 2 +- 8 files changed, 131 insertions(+), 68 deletions(-) create mode 100644 server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index f0e7d7cf0d362..6ac698a9a7682 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -117,6 +117,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final AtomicLong metadataUploadCounter = new AtomicLong(0); + private final Boolean useSameThreadPool; + public static final int METADATA_FILES_TO_FETCH = 10; public RemoteSegmentStoreDirectory( @@ -124,7 +126,8 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - ShardId shardId + ShardId shardId, + Boolean useSameThreadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; @@ -133,6 +136,7 @@ public RemoteSegmentStoreDirectory( this.threadPool = threadPool; this.logger = Loggers.getLogger(getClass(), shardId); init(); + this.useSameThreadPool = useSameThreadPool; } /** @@ -871,7 +875,7 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener) { if (canDeleteStaleCommits.compareAndSet(true, false)) { try { - threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + threadPool.executor(getExecutorNameForAsyncOperations()).execute(() -> { try { deleteStaleSegments(lastNMetadataFilesToKeep); listener.onResponse(null); @@ -899,15 +903,8 @@ public static void remoteDirectoryCleanup( String indexUUID, ShardId shardId ) { - try ( - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( - remoteStoreRepoForIndex, - indexUUID, - shardId - ) - ) { - remoteDirectory.deleteStaleSegments(0); // sync stale segments cleanup - remoteDirectory.deleteIfEmpty(); + try { + remoteDirectoryFactory.newDirectory(remoteStoreRepoForIndex, indexUUID, shardId, true).close(); } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } @@ -938,6 +935,10 @@ private boolean deleteIfEmpty() throws IOException { return true; } + private String getExecutorNameForAsyncOperations() { + return useSameThreadPool ? ThreadPool.Names.SAME : ThreadPool.Names.REMOTE_PURGE; + } + @Override public void close() throws IOException { deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory"))); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index a5e89ec6a8327..f82489004ca56 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -50,6 +50,11 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw } public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId) throws IOException { + return newDirectory(repositoryName, indexUUID, shardId, false); + } + + // visible for testing + Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId, Boolean useSameThreadPool) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); @@ -71,9 +76,10 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, useSameThreadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } + } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 216fb7db3d67c..4a932d296a82f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -160,9 +160,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -175,7 +173,6 @@ import java.util.stream.LongStream; import java.util.stream.Stream; -import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -240,6 +237,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Deprecated ); + private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class); + /** * Setting to disable caching of the latest repository data. */ @@ -1107,9 +1106,10 @@ public static void remoteDirectoryCleanupAsync( ThreadPool threadpool, String remoteStoreRepoForIndex, String indexUUID, - ShardId shardId + ShardId shardId, + String threadPoolName ) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE) + threadpool.executor(threadPoolName) .execute( new RemoteStoreShardCleanupTask( () -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup( @@ -1118,58 +1118,12 @@ public static void remoteDirectoryCleanupAsync( indexUUID, shardId ), - threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId ) ); } - /** - A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. - */ - public static class RemoteStoreShardCleanupTask implements Runnable { - private final Runnable task; - private final ExecutorService executor; - private final String shardIdentifier; - final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); - final static ConcurrentMap> shardCleanupPendingTasks = new ConcurrentHashMap<>(); - - public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) { - this.task = task; - this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); - this.executor = executor; - } - - private static String indexShardIdentifier(String indexUUID, ShardId shardId) { - return String.join("/", indexUUID, String.valueOf(shardId.id())); - } - - @Override - public void run() { - if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { - try { - task.run(); - BlockingQueue pendingTasks = shardCleanupPendingTasks.get(shardIdentifier); - try { - if (pendingTasks != null) { - for (Runnable pendingTask = pendingTasks.poll(0L, TimeUnit.MILLISECONDS); pendingTask != null;) { - pendingTask.run(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } finally { - ongoingRemoteDirectoryCleanups.remove(shardIdentifier); - } - } else { - shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>()); - shardCleanupPendingTasks.get(shardIdentifier).add(task); - } - } - } - protected void releaseRemoteStoreLockAndCleanup( String shardId, String shallowSnapshotUUID, @@ -1213,7 +1167,8 @@ protected void releaseRemoteStoreLockAndCleanup( threadPool, remoteStoreRepoForIndex, indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)) + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), + ThreadPool.Names.REMOTE_PURGE ); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java new file mode 100644 index 0000000000000..02219d68dd40b --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; + +import java.util.Map; +import java.util.Set; + +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; + +/** + A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. + */ +public class RemoteStoreShardCleanupTask implements Runnable { + private final Runnable task; + private final String shardIdentifier; + final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); + final static Map pendingRemoteDirectoryCleanups = newConcurrentMap(); + private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class); + + public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) { + this.task = task; + this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); + } + + private static String indexShardIdentifier(String indexUUID, ShardId shardId) { + return String.join("/", indexUUID, String.valueOf(shardId.id())); + } + + @Override + public void run() { + // TODO: there is a known race condition scenario in this method which needs to be handled where one of the + // thread just came out of while loop adding a new task in the pending task map, and another thread added + // new pending task in the map. we need to introduce semaphores/locks to avoid that situation which + // introduces the overhead of lock cleanup. + if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) { + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) { + Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier); + pendingRemoteDirectoryCleanups.remove(shardIdentifier); + newTask.run(); + } + ongoingRemoteDirectoryCleanups.remove(shardIdentifier); + } else { + staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier); + } + } else { + staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 85878cc2e1c9d..cacab92875fbb 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -167,7 +167,8 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { remoteMetadataDirectory, mock(RemoteStoreLockManager.class), mock(ThreadPool.class), - shardId + shardId, + false ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 9ee6efeb309ae..fe649574debcf 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -156,7 +156,8 @@ public void setup() throws IOException { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId() + indexShard.shardId(), + false ); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); @@ -649,7 +650,8 @@ public void testCopyFilesFromMultipartIOException() throws Exception { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId() + indexShard.shardId(), + false ); populateMetadata(); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 9c65ad32fa6a6..b76e01d6d4c82 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -42,6 +42,8 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; @@ -64,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -318,4 +321,38 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo return repoData; } + public void testRemoteStoreShardCleanupTask() { + // todo: move it to separate class and add more scenarios. + AtomicBoolean executed1 = new AtomicBoolean(false); + Runnable task1 = () -> executed1.set(true); + String indexName = "test-idx"; + String testIndexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0); + + // Scenario 1: pending = empty, ongoing = false => executed + RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertTrue(executed1.get()); + + // Scenario 2: pending = empty, ongoing = true => pending = currentTask + executed1.set(false); + String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id())); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1); + + // Scenario3: pending = anotherTask, ongoing = true => pending = currentTask + AtomicBoolean executed2 = new AtomicBoolean(false); + Runnable task2 = () -> executed2.set(true); + RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bf1c4d4c94e04..014770ab59163 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -790,7 +790,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex().resolve("lock_files"))) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId, false); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {