From 8059d763d83d37f9c1383749bf272457f5b5fc25 Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Thu, 14 Mar 2024 03:23:29 -0700 Subject: [PATCH] Optimize remote store operations during snapshot Deletion (#12319) Signed-off-by: Harish Bhakuni --- .../store/RemoteSegmentStoreDirectory.java | 71 ++++++- .../RemoteSegmentStoreDirectoryFactory.java | 1 + .../blobstore/BlobStoreRepository.java | 177 ++++++++++-------- .../RemoteStoreShardCleanupTask.java | 63 +++++++ .../RemoteSegmentStoreDirectoryTests.java | 96 +++++++++- .../blobstore/BlobStoreRepositoryTests.java | 37 ++++ 6 files changed, 366 insertions(+), 79 deletions(-) create mode 100644 server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index bfab9f8c18aa2..c9a238c6e3350 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -718,10 +718,49 @@ public Map getSegmentsUploadedToRemoteStore() { return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore); } + // Visible for testing + Set getMetadataFilesToFilterActiveSegments( + final int lastNMetadataFilesToKeep, + final List sortedMetadataFiles, + final Set lockedMetadataFiles + ) { + // the idea here is for each deletable md file, we can consider the segments present in non-deletable md file + // before this and non-deletable md file after this to compute the active segment files. + // For ex: + // lastNMetadataFilesToKeep = 3 + // sortedMetadataFiles = [m1, m2, m3, m4, m5, m6(locked), m7(locked), m8(locked), m9(locked), m10] + // lockedMetadataFiles = m6, m7, m8, m9 + // then the returned set will be (m3, m6, m9) + final Set metadataFilesToFilterActiveSegments = new HashSet<>(); + for (int idx = lastNMetadataFilesToKeep; idx < sortedMetadataFiles.size(); idx++) { + if (lockedMetadataFiles.contains(sortedMetadataFiles.get(idx)) == false) { + String prevMetadata = (idx - 1) >= 0 ? sortedMetadataFiles.get(idx - 1) : null; + String nextMetadata = (idx + 1) < sortedMetadataFiles.size() ? sortedMetadataFiles.get(idx + 1) : null; + + if (prevMetadata != null && (lockedMetadataFiles.contains(prevMetadata) || idx == lastNMetadataFilesToKeep)) { + // if previous metadata of deletable md is locked, add it to md files for active segments. + metadataFilesToFilterActiveSegments.add(prevMetadata); + } + if (nextMetadata != null && lockedMetadataFiles.contains(nextMetadata)) { + // if next metadata of deletable md is locked, add it to md files for active segments. + metadataFilesToFilterActiveSegments.add(nextMetadata); + } + } + } + return metadataFilesToFilterActiveSegments; + } + /** * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, - * we just need to read the latest metadata file. All the stale metadata files can be safely deleted. + * we just need to read the latest metadata file. + * Assumptions: + * (1) if a segment file is not present in a md file, it will never be present in any md file created after that, and + * (2) if (md1, md2, md3) are in sorted order, it is not possible that a segment file will be in md1 and md3 but not in md2. + *

+ * for each deletable md file, segments present in non-deletable md file before this and non-deletable md file + * after this are sufficient to compute the list of active or non-deletable segment files referenced by a deletable + * md file * * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store @@ -760,7 +799,6 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException .filter(metadataFile -> allLockFiles.contains(metadataFile) == false) .collect(Collectors.toList()); - sortedMetadataFileList.removeAll(metadataFilesToBeDeleted); logger.debug( "metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}", metadataFilesEligibleToDelete, @@ -769,7 +807,14 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException Map activeSegmentFilesMetadataMap = new HashMap<>(); Set activeSegmentRemoteFilenames = new HashSet<>(); - for (String metadataFile : sortedMetadataFileList) { + + final Set metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments( + lastNMetadataFilesToKeep, + sortedMetadataFileList, + allLockFiles + ); + + for (String metadataFile : metadataFilesToFilterActiveSegments) { Map segmentMetadataMap = readMetadataFile(metadataFile).getMetadata(); activeSegmentFilesMetadataMap.putAll(segmentMetadataMap); activeSegmentRemoteFilenames.addAll( @@ -848,6 +893,25 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene } } + public static void remoteDirectoryCleanup( + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory, + String remoteStoreRepoForIndex, + String indexUUID, + ShardId shardId + ) { + try { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( + remoteStoreRepoForIndex, + indexUUID, + shardId + ); + remoteSegmentStoreDirectory.deleteStaleSegments(0); + remoteSegmentStoreDirectory.deleteIfEmpty(); + } catch (Exception e) { + staticLogger.error("Exception occurred while deleting directory", e); + } + } + /* Tries to delete shard level directory if it is empty Return true if it deleted it successfully @@ -870,7 +934,6 @@ private boolean deleteIfEmpty() throws IOException { logger.error("Exception occurred while deleting directory", e); return false; } - return true; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index a5e89ec6a8327..eca8d9ec702e1 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } + } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 18f4ab70024f4..4a932d296a82f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -117,6 +117,7 @@ import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.snapshots.blobstore.SlicedInputStream; import org.opensearch.index.snapshots.blobstore.SnapshotFiles; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -236,6 +237,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Deprecated ); + private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class); + /** * Setting to disable caching of the latest repository data. */ @@ -1098,6 +1101,78 @@ private void asyncCleanupUnlinkedShardLevelBlobs( } } + public static void remoteDirectoryCleanupAsync( + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory, + ThreadPool threadpool, + String remoteStoreRepoForIndex, + String indexUUID, + ShardId shardId, + String threadPoolName + ) { + threadpool.executor(threadPoolName) + .execute( + new RemoteStoreShardCleanupTask( + () -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup( + remoteDirectoryFactory, + remoteStoreRepoForIndex, + indexUUID, + shardId + ), + indexUUID, + shardId + ) + ); + } + + protected void releaseRemoteStoreLockAndCleanup( + String shardId, + String shallowSnapshotUUID, + BlobContainer shardContainer, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) throws IOException { + if (remoteStoreLockManagerFactory == null) { + return; + } + + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardContainer, + shallowSnapshotUUID, + namedXContentRegistry + ); + String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); + String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); + // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while + // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during + // next delete operation for releasing this lock file + RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + shardId + ); + remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build()); + logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID); + if (!isIndexPresent(clusterService, indexUUID)) { + // Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if + // index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE + // threadpool. if it fails, it could leave some stale files in remote directory. this issue could + // even happen in cases of shard level remote store data cleanup which also happens asynchronously. + // in long term, we have plans to implement remote store GC poller mechanism which will take care of + // such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( + remoteStoreLockManagerFactory.getRepositoriesService(), + threadPool + ); + remoteDirectoryCleanupAsync( + remoteDirectoryFactory, + threadPool, + remoteStoreRepoForIndex, + indexUUID, + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), + ThreadPool.Names.REMOTE_PURGE + ); + } + } + // When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective // shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs. private void executeStaleShardDelete( @@ -1109,53 +1184,34 @@ private void executeStaleShardDelete( if (filesToDelete != null) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { try { - if (remoteStoreLockManagerFactory != null) { - for (String fileToDelete : filesToDelete) { - if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { - String[] fileToDeletePath = fileToDelete.split("/"); - String indexId = fileToDeletePath[1]; - String shardId = fileToDeletePath[2]; - String shallowSnapBlob = fileToDeletePath[3]; - String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow(); - BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); - RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( - shardContainer, - snapshotUUID, - namedXContentRegistry - ); - String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); - String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); - // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while - // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during - // next delete operation for releasing this lock file - RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - shardId - ); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() + // filtering files for which remote store lock release and cleanup succeeded, + // remaining files for which it failed will be retried in next snapshot delete run. + List eligibleFilesToDelete = new ArrayList<>(); + for (String fileToDelete : filesToDelete) { + if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { + String[] fileToDeletePath = fileToDelete.split("/"); + String indexId = fileToDeletePath[1]; + String shardId = fileToDeletePath[2]; + String shallowSnapBlob = fileToDeletePath[3]; + String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow(); + BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + try { + releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory); + eligibleFilesToDelete.add(fileToDelete); + } catch (Exception e) { + logger.error( + "Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}", + indexId, + shardId, + snapshotUUID ); - if (!isIndexPresent(clusterService, indexUUID)) { - // this is a temporary solution where snapshot deletion triggers remote store side - // cleanup if index is already deleted. We will add a poller in future to take - // care of remote store side cleanup. - // see https://github.com/opensearch-project/OpenSearch/issues/8469 - new RemoteSegmentStoreDirectoryFactory( - remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool - ).newDirectory( - remoteStoreRepoForIndex, - indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId)) - ).close(); - } } + } else { + eligibleFilesToDelete.add(fileToDelete); } } // Deleting the shard blobs - deleteFromContainer(blobContainer(), filesToDelete); + deleteFromContainer(blobContainer(), eligibleFilesToDelete); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -1588,39 +1644,12 @@ private void executeOneStaleIndexDelete( for (String blob : shardBlob.getValue().listBlobs().keySet()) { final Optional snapshotUUID = extractShallowSnapshotUUID(blob); if (snapshotUUID.isPresent()) { - RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( - shardBlob.getValue(), - snapshotUUID.get(), - namedXContentRegistry - ); - String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); - String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); - // Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure - // while releasing the lock file, we would still have the corresponding shallow-snap-UUID file - // and that would be used during next delete operation for releasing this stale lock file - RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - shardBlob.getKey() - ); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build() + releaseRemoteStoreLockAndCleanup( + shardBlob.getKey(), + snapshotUUID.get(), + shardBlob.getValue(), + remoteStoreLockManagerFactory ); - if (!isIndexPresent(clusterService, indexUUID)) { - // this is a temporary solution where snapshot deletion triggers remote store side - // cleanup if index is already deleted. We will add a poller in future to take - // care of remote store side cleanup. - // see https://github.com/opensearch-project/OpenSearch/issues/8469 - new RemoteSegmentStoreDirectoryFactory( - remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool - ).newDirectory( - remoteStoreRepoForIndex, - indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey())) - ).close(); - } } } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java new file mode 100644 index 0000000000000..df61c1ca3263b --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; + +import java.util.Map; +import java.util.Set; + +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; + +/** + A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time. + */ +public class RemoteStoreShardCleanupTask implements Runnable { + private final Runnable task; + private final String shardIdentifier; + final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); + final static Map pendingRemoteDirectoryCleanups = newConcurrentMap(); + private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class); + + public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) { + this.task = task; + this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); + } + + private static String indexShardIdentifier(String indexUUID, ShardId shardId) { + return String.join("/", indexUUID, String.valueOf(shardId.id())); + } + + @Override + public void run() { + // TODO: this is the best effort at the moment since there is still a known race condition scenario in this + // method which needs to be handled where one of the thread just came out of while loop and removed the + // entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map. + // we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object + // cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time. + // + if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) { + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) { + Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier); + pendingRemoteDirectoryCleanups.remove(shardIdentifier); + newTask.run(); + } + ongoingRemoteDirectoryCleanups.remove(shardIdentifier); + } else { + staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier); + } + } else { + staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index e2ebb2e642bfe..8b69c15dac9d3 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -34,6 +34,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; @@ -164,6 +166,7 @@ public void setup() throws IOException { when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(executorService); } @After @@ -499,6 +502,75 @@ public void testIsAcquiredException() throws IOException { assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); } + private List getDummyMetadataFiles(int count) { + List sortedMetadataFiles = new ArrayList<>(); + for (int counter = 0; counter < count; counter++) { + sortedMetadataFiles.add(RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(counter, 23, 34, 1, 1, "node-1")); + } + return sortedMetadataFiles; + } + + public void testGetMetadataFilesForActiveSegments() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + // scenario 1: if activeSegments([[0, 1, 2], 3(l), 4(l), 5(l), 6(l), 7(l), 8(l), 9]) => [9] + List sortedMdFiles = getDummyMetadataFiles(10); + Set lockedMdFiles = new HashSet<>(); + for (int idx = 3; idx <= 8; idx++) { + lockedMdFiles.add(sortedMdFiles.get(idx)); + } + Set expectedMdFilesForActiveSegments = Set.of(sortedMdFiles.get(8)); + assertEquals( + "scenario 1 failed", + expectedMdFilesForActiveSegments, + remoteSegmentStoreDirectory.getMetadataFilesToFilterActiveSegments(3, sortedMdFiles, lockedMdFiles) + ); + + // scenario 2: if activeSegments([[0, 1, 2], 3, 4, 5, 6(l), 7(l), 8(l), 9]) => [2, 6, 8] + lockedMdFiles.clear(); + lockedMdFiles.add(sortedMdFiles.get(6)); + lockedMdFiles.add(sortedMdFiles.get(7)); + lockedMdFiles.add(sortedMdFiles.get(8)); + expectedMdFilesForActiveSegments = Set.of(sortedMdFiles.get(2), sortedMdFiles.get(6), sortedMdFiles.get(8)); + assertEquals( + "scenario 2 failed", + expectedMdFilesForActiveSegments, + remoteSegmentStoreDirectory.getMetadataFilesToFilterActiveSegments(3, sortedMdFiles, lockedMdFiles) + ); + + // scenario 3: if activeSegments([[0, 1, 2], 3, 4, 5(l), 6, 7(l), 8(l), 9]) => [3, 5, 7, 8] + lockedMdFiles.clear(); + lockedMdFiles.add(sortedMdFiles.get(5)); + lockedMdFiles.add(sortedMdFiles.get(7)); + lockedMdFiles.add(sortedMdFiles.get(8)); + expectedMdFilesForActiveSegments = Set.of(sortedMdFiles.get(2), sortedMdFiles.get(5), sortedMdFiles.get(7), sortedMdFiles.get(8)); + assertEquals( + "scenario 3 failed", + expectedMdFilesForActiveSegments, + remoteSegmentStoreDirectory.getMetadataFilesToFilterActiveSegments(3, sortedMdFiles, lockedMdFiles) + ); + + // scenario 3: if activeSegments([[0(l), 1(l), 2(l), 3(l), 4(l), 5(l), 6(l), 7(l), 8(l), 9(l)]) + lockedMdFiles.addAll(sortedMdFiles); + expectedMdFilesForActiveSegments = Set.of(); + assertEquals( + "scenario 4 failed", + expectedMdFilesForActiveSegments, + remoteSegmentStoreDirectory.getMetadataFilesToFilterActiveSegments(0, sortedMdFiles, lockedMdFiles) + ); + + // scenario 5: if (activeSegments([[0, 1, 2, 3]]) => [] + sortedMdFiles = sortedMdFiles.subList(0, 4); + lockedMdFiles.clear(); + expectedMdFilesForActiveSegments = Set.of(); + assertEquals( + "scenario 5 failed", + expectedMdFilesForActiveSegments, + remoteSegmentStoreDirectory.getMetadataFilesToFilterActiveSegments(4, sortedMdFiles, lockedMdFiles) + ); + } + public void testGetMetadataFileForCommit() throws IOException { long testPrimaryTerm = 2; long testGeneration = 3; @@ -511,7 +583,6 @@ public void testGetMetadataFileForCommit() throws IOException { String output = remoteSegmentStoreDirectory.getMetadataFileForCommit(testPrimaryTerm, testGeneration); assertEquals("metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr", output); - } public void testCopyFrom() throws IOException { @@ -616,6 +687,29 @@ public void onFailure(Exception e) { storeDirectory.close(); } + public void testCleanupAsync() throws Exception { + populateMetadata(); + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory = mock(RemoteSegmentStoreDirectoryFactory.class); + RemoteSegmentStoreDirectory remoteSegmentDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool, + indexShard.shardId() + ); + when(remoteSegmentStoreDirectoryFactory.newDirectory(any(), any(), any())).thenReturn(remoteSegmentDirectory); + String repositoryName = "test-repository"; + String indexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt("0")); + + RemoteSegmentStoreDirectory.remoteDirectoryCleanup(remoteSegmentStoreDirectoryFactory, repositoryName, indexUUID, shardId); + verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId); + verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); + verify(remoteMetadataDirectory).delete(); + verify(remoteDataDirectory).delete(); + verify(mdLockManager).delete(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 9c65ad32fa6a6..b76e01d6d4c82 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -42,6 +42,8 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; @@ -64,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -318,4 +321,38 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo return repoData; } + public void testRemoteStoreShardCleanupTask() { + // todo: move it to separate class and add more scenarios. + AtomicBoolean executed1 = new AtomicBoolean(false); + Runnable task1 = () -> executed1.set(true); + String indexName = "test-idx"; + String testIndexUUID = "test-idx-uuid"; + ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0); + + // Scenario 1: pending = empty, ongoing = false => executed + RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertTrue(executed1.get()); + + // Scenario 2: pending = empty, ongoing = true => pending = currentTask + executed1.set(false); + String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id())); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1); + + // Scenario3: pending = anotherTask, ongoing = true => pending = currentTask + AtomicBoolean executed2 = new AtomicBoolean(false); + Runnable task2 = () -> executed2.set(true); + RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1); + RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier); + + remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId); + remoteStoreShardCleanupTask.run(); + assertFalse(executed1.get()); + assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2); + } }