Skip to content

Commit

Permalink
Make sure at a time only one remoteDirectory cleanup is going on for …
Browse files Browse the repository at this point in the history
…a given shard.

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Mar 9, 2024
1 parent fcd4257 commit cfabef4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -900,28 +901,20 @@ public static void cleanupAsync(
String indexUUID,
ShardId shardId
) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegmentsAsync(
0,
ActionListener.wrap(
r -> remoteDirectory.deleteIfEmpty(),
e -> staticLogger.error("Failed to cleanup remote directory")
),
ThreadPool.Names.SAME
);
}
} catch (IOException e) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(new BlobStoreRepository.RemoteStoreShardCleanupTask(() -> {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0); // cleanup stale segments in sync.
remoteDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
});
}, threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -173,6 +175,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;

Expand Down Expand Up @@ -1099,6 +1102,42 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

public static class RemoteStoreShardCleanupTask implements Runnable {
private final Runnable task;
private final ExecutorService executor;
private final String shardIdentifier;
final static Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();
final static ConcurrentMap<String, BlockingQueue<Runnable>> shardCleanupPendingTasks = new ConcurrentHashMap<>();

public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) {
this.task = task;
this.shardIdentifier = indexShardIdentifier(indexUUID, shardId);
this.executor = executor;
}

private static String indexShardIdentifier(String indexUUID, ShardId shardId) {
return String.join("/", indexUUID, String.valueOf(shardId.id()));
}

@Override
public void run() {
try {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
task.run();
Runnable nextTask = shardCleanupPendingTasks.get(shardIdentifier).poll(0L, TimeUnit.MILLISECONDS);
if (nextTask != null) {
executor.execute(nextTask);
}
} else {
shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>());
shardCleanupPendingTasks.get(shardIdentifier).add(task);
}
} catch (InterruptedException e) {
// todo: log exception..
}
}
}

protected void releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
Expand Down

0 comments on commit cfabef4

Please sign in to comment.