Skip to content

Commit

Permalink
Add changes to make sure no two remoteDirectory cleanup for same shar…
Browse files Browse the repository at this point in the history
…d is ongoing at same time.

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Mar 9, 2024
1 parent fcd4257 commit b3c2ea3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -898,30 +899,38 @@ public static void cleanupAsync(
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
ShardId shardId,
String indexShardIdentifier,
Set<String> ongoingDirectoryCleanups
) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {

if (ongoingDirectoryCleanups.add(indexShardIdentifier)) {
try {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegmentsAsync(
0,
ActionListener.wrap(
r -> remoteDirectory.deleteIfEmpty(),
e -> staticLogger.error("Failed to cleanup remote directory")
),
ThreadPool.Names.SAME
);
}
} catch (IOException e) {
staticLogger.error("Exception occurred while deleting directory", e);
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0); // cleanup stale segments in sync.
remoteDirectory.deleteIfEmpty();
}
} catch (IOException e) {
staticLogger.error("Exception occurred while deleting directory", e);
} finally {
ongoingDirectoryCleanups.remove(indexShardIdentifier);
}
});
} catch (Exception e) {
ongoingDirectoryCleanups.remove(indexShardIdentifier);
throw e;
}
});
} else {
throw new ConcurrentModificationException("cannot be executed since another cleanup ongoing");
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;

Expand Down Expand Up @@ -315,6 +316,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private volatile RateLimiter remoteDownloadRateLimiter;

private final Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();

private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();
Expand Down Expand Up @@ -1142,7 +1145,9 @@ protected void releaseRemoteStoreLockAndCleanup(
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId))
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
indexUUID + shardId,
ongoingRemoteDirectoryCleanups
);
}
}
Expand Down

0 comments on commit b3c2ea3

Please sign in to comment.