Skip to content

Commit

Permalink
Address PR Comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Mar 11, 2024
1 parent 93c5168 commit 92a7b56
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -860,7 +859,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}), ThreadPool.Names.REMOTE_PURGE);
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}));
}

/**
Expand All @@ -869,10 +868,10 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener, String threadPoolName) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(threadPoolName).execute(() -> {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
listener.onResponse(null);
Expand All @@ -894,27 +893,24 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}
}

public static void cleanupAsync(
public static void remoteDirectoryCleanup(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(new BlobStoreRepository.RemoteStoreShardCleanupTask(() -> {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0); // cleanup stale segments in sync.
remoteDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
}, threadpool.executor(ThreadPool.Names.REMOTE_PURGE), indexUUID, shardId));
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0); // sync stale segments cleanup
remoteDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
}

/*
Expand Down Expand Up @@ -944,10 +940,6 @@ private boolean deleteIfEmpty() throws IOException {

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(
0,
ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")),
ThreadPool.Names.REMOTE_PURGE
);
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,29 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

public static void remoteDirectoryCleanupAsync(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE)
.execute(
new RemoteStoreShardCleanupTask(
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(
remoteDirectoryFactory,
remoteStoreRepoForIndex,
indexUUID,
shardId
),
threadpool.executor(ThreadPool.Names.REMOTE_PURGE),
indexUUID,
shardId
)
);
}

public static class RemoteStoreShardCleanupTask implements Runnable {
private final Runnable task;
private final ExecutorService executor;
Expand All @@ -1121,20 +1144,23 @@ private static String indexShardIdentifier(String indexUUID, ShardId shardId) {

@Override
public void run() {
try {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
try {
task.run();
Runnable nextTask = shardCleanupPendingTasks.get(shardIdentifier).poll(0L, TimeUnit.MILLISECONDS);
if (nextTask != null) {
executor.execute(nextTask);
BlockingQueue<Runnable> pendingTasks = shardCleanupPendingTasks.get(shardIdentifier);
try {
for (Runnable pendingTask = pendingTasks.poll(0L, TimeUnit.MILLISECONDS); pendingTask != null;) {
pendingTask.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
ongoingRemoteDirectoryCleanups.remove(shardIdentifier);
} else {
shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>());
shardCleanupPendingTasks.get(shardIdentifier).add(task);
}
} catch (InterruptedException e) {
// todo: log exception..
} else {
shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>());
shardCleanupPendingTasks.get(shardIdentifier).add(task);
}
}
}
Expand Down Expand Up @@ -1177,7 +1203,7 @@ protected void releaseRemoteStoreLockAndCleanup(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
);
RemoteSegmentStoreDirectory.cleanupAsync(
remoteDirectoryCleanupAsync(
remoteDirectoryFactory,
threadPool,
remoteStoreRepoForIndex,
Expand Down

0 comments on commit 92a7b56

Please sign in to comment.