diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 49ddaefce7be4..5bb7d77cf94ba 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -39,6 +39,7 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; @@ -900,28 +901,20 @@ public static void cleanupAsync( String indexUUID, ShardId shardId ) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { - try { - try ( - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( - remoteStoreRepoForIndex, - indexUUID, - shardId - ) - ) { - remoteDirectory.deleteStaleSegmentsAsync( - 0, - ActionListener.wrap( - r -> remoteDirectory.deleteIfEmpty(), - e -> staticLogger.error("Failed to cleanup remote directory") - ), - ThreadPool.Names.SAME - ); - } - } catch (IOException e) { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(new BlobStoreRepository.RemoteStoreShardCleanupTask(() -> { + try ( + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( + remoteStoreRepoForIndex, + indexUUID, + shardId + ) + ) { + remoteDirectory.deleteStaleSegments(0); // cleanup stale segments in sync. + remoteDirectory.deleteIfEmpty(); + } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } - }); + }, threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId)); } /* diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index aa9c1426fc54a..7e5ef33c4f0ea 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -160,7 +160,9 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -173,6 +175,7 @@ import java.util.stream.LongStream; import java.util.stream.Stream; +import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -1099,6 +1102,42 @@ private void asyncCleanupUnlinkedShardLevelBlobs( } } + public static class RemoteStoreShardCleanupTask implements Runnable { + private final Runnable task; + private final ExecutorService executor; + private final String shardIdentifier; + final static Set ongoingRemoteDirectoryCleanups = newConcurrentSet(); + final static ConcurrentMap> shardCleanupPendingTasks = new ConcurrentHashMap<>(); + + public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) { + this.task = task; + this.shardIdentifier = indexShardIdentifier(indexUUID, shardId); + this.executor = executor; + } + + private static String indexShardIdentifier(String indexUUID, ShardId shardId) { + return String.join("/", indexUUID, String.valueOf(shardId.id())); + } + + @Override + public void run() { + try { + if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) { + task.run(); + Runnable nextTask = shardCleanupPendingTasks.get(shardIdentifier).poll(0L, TimeUnit.MILLISECONDS); + if (nextTask != null) { + executor.execute(nextTask); + } + } else { + shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>()); + shardCleanupPendingTasks.get(shardIdentifier).add(task); + } + } catch (InterruptedException e) { + // todo: log exception.. + } + } + } + protected void releaseRemoteStoreLockAndCleanup( String shardId, String shallowSnapshotUUID,