Skip to content

Commit

Permalink
Address PR Comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Mar 14, 2024
1 parent 76d1319 commit 5ba410b
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,17 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final AtomicLong metadataUploadCounter = new AtomicLong(0);

private final Boolean useSameThreadPool;

public static final int METADATA_FILES_TO_FETCH = 10;

public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager,
ThreadPool threadPool,
ShardId shardId
ShardId shardId,
Boolean useSameThreadPool
) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
Expand All @@ -133,6 +136,7 @@ public RemoteSegmentStoreDirectory(
this.threadPool = threadPool;
this.logger = Loggers.getLogger(getClass(), shardId);
init();
this.useSameThreadPool = useSameThreadPool;
}

/**
Expand Down Expand Up @@ -871,7 +875,7 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
threadPool.executor(getExecutorNameForAsyncOperations()).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
listener.onResponse(null);
Expand Down Expand Up @@ -899,15 +903,8 @@ public static void remoteDirectoryCleanup(
String indexUUID,
ShardId shardId
) {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0); // sync stale segments cleanup
remoteDirectory.deleteIfEmpty();
try {
remoteDirectoryFactory.newDirectory(remoteStoreRepoForIndex, indexUUID, shardId, true).close();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
Expand Down Expand Up @@ -938,6 +935,10 @@ private boolean deleteIfEmpty() throws IOException {
return true;
}

private String getExecutorNameForAsyncOperations() {
return useSameThreadPool ? ThreadPool.Names.SAME : ThreadPool.Names.REMOTE_PURGE;
}

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw
}

public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId) throws IOException {
return newDirectory(repositoryName, indexUUID, shardId, false);
}

// visible for testing
Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId, Boolean useSameThreadPool) throws IOException {
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository);
Expand All @@ -71,9 +76,10 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
String.valueOf(shardId.id())
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, useSameThreadPool);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -175,7 +173,6 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;

Expand Down Expand Up @@ -240,6 +237,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class);

/**
* Setting to disable caching of the latest repository data.
*/
Expand Down Expand Up @@ -1107,9 +1106,10 @@ public static void remoteDirectoryCleanupAsync(
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
ShardId shardId,
String threadPoolName
) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE)
threadpool.executor(threadPoolName)
.execute(
new RemoteStoreShardCleanupTask(
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(
Expand All @@ -1118,58 +1118,12 @@ public static void remoteDirectoryCleanupAsync(
indexUUID,
shardId
),
threadpool.executor(ThreadPool.Names.REMOTE_PURGE),
indexUUID,
shardId
)
);
}

/**
A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time.
*/
public static class RemoteStoreShardCleanupTask implements Runnable {
private final Runnable task;
private final ExecutorService executor;
private final String shardIdentifier;
final static Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();
final static ConcurrentMap<String, BlockingQueue<Runnable>> shardCleanupPendingTasks = new ConcurrentHashMap<>();

public RemoteStoreShardCleanupTask(Runnable task, ExecutorService executor, String indexUUID, ShardId shardId) {
this.task = task;
this.shardIdentifier = indexShardIdentifier(indexUUID, shardId);
this.executor = executor;
}

private static String indexShardIdentifier(String indexUUID, ShardId shardId) {
return String.join("/", indexUUID, String.valueOf(shardId.id()));
}

@Override
public void run() {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
try {
task.run();
BlockingQueue<Runnable> pendingTasks = shardCleanupPendingTasks.get(shardIdentifier);
try {
if (pendingTasks != null) {
for (Runnable pendingTask = pendingTasks.poll(0L, TimeUnit.MILLISECONDS); pendingTask != null;) {
pendingTask.run();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
ongoingRemoteDirectoryCleanups.remove(shardIdentifier);
}
} else {
shardCleanupPendingTasks.putIfAbsent(shardIdentifier, new LinkedBlockingQueue<>());
shardCleanupPendingTasks.get(shardIdentifier).add(task);
}
}
}

protected void releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
Expand Down Expand Up @@ -1213,7 +1167,8 @@ protected void releaseRemoteStoreLockAndCleanup(
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId))
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
ThreadPool.Names.REMOTE_PURGE
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.index.shard.ShardId;

import java.util.Map;
import java.util.Set;

import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;

/**
A Runnable wrapper to make sure that for a given shard only one cleanup task runs at a time.
*/
public class RemoteStoreShardCleanupTask implements Runnable {
private final Runnable task;
private final String shardIdentifier;
final static Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();
final static Map<String, Runnable> pendingRemoteDirectoryCleanups = newConcurrentMap();
private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class);

public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) {
this.task = task;
this.shardIdentifier = indexShardIdentifier(indexUUID, shardId);
}

private static String indexShardIdentifier(String indexUUID, ShardId shardId) {
return String.join("/", indexUUID, String.valueOf(shardId.id()));
}

@Override
public void run() {
// TODO: this is the best effort at the moment since there is still a known race condition scenario in this
// method which needs to be handled where one of the thread just came out of while loop and removed the
// entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map.
// we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object
// cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time.
// <issue-link>
if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) {
Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier);
pendingRemoteDirectoryCleanups.remove(shardIdentifier);
newTask.run();
}
ongoingRemoteDirectoryCleanups.remove(shardIdentifier);
} else {
staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier);
}
} else {
staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
remoteMetadataDirectory,
mock(RemoteStoreLockManager.class),
mock(ThreadPool.class),
shardId
shardId,
false
);
FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory(
new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -156,7 +158,8 @@ public void setup() throws IOException {
remoteMetadataDirectory,
mdLockManager,
threadPool,
indexShard.shardId()
indexShard.shardId(),
false
);
try (Store store = indexShard.store()) {
segmentInfos = store.readLastCommittedSegmentsInfo();
Expand Down Expand Up @@ -649,7 +652,8 @@ public void testCopyFilesFromMultipartIOException() throws Exception {
remoteMetadataDirectory,
mdLockManager,
threadPool,
indexShard.shardId()
indexShard.shardId(),
false
);

populateMetadata();
Expand Down Expand Up @@ -685,6 +689,31 @@ public void onFailure(Exception e) {
storeDirectory.close();
}

public void testCleanupAsync() throws Exception {
populateMetadata();
RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory = mock(RemoteSegmentStoreDirectoryFactory.class);
RemoteSegmentStoreDirectory remoteSegmentDirectory = new RemoteSegmentStoreDirectory(
remoteDataDirectory,
remoteMetadataDirectory,
mdLockManager,
threadPool,
indexShard.shardId(),
true
);
when(remoteSegmentStoreDirectoryFactory.newDirectory(any(), any(), any(), any())).thenReturn(remoteSegmentDirectory);
String repositoryName = "test-repository";
String indexUUID = "test-idx-uuid";
ShardId shardId = new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt("0"));

RemoteSegmentStoreDirectory.remoteDirectoryCleanup(remoteSegmentStoreDirectoryFactory, repositoryName, indexUUID, shardId);
verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, true);
verify(threadPool).executor(ThreadPool.Names.SAME);
verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE);
verify(remoteMetadataDirectory).delete();
verify(remoteDataDirectory).delete();
verify(mdLockManager).delete();
}

public void testCopyFromException() throws IOException {
String filename = "_100.si";
Directory storeDirectory = LuceneTestCase.newDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
Expand All @@ -64,6 +66,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -318,4 +321,38 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo
return repoData;
}

public void testRemoteStoreShardCleanupTask() {
// todo: move it to separate class and add more scenarios.
AtomicBoolean executed1 = new AtomicBoolean(false);
Runnable task1 = () -> executed1.set(true);
String indexName = "test-idx";
String testIndexUUID = "test-idx-uuid";
ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0);

// Scenario 1: pending = empty, ongoing = false => executed
RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertTrue(executed1.get());

// Scenario 2: pending = empty, ongoing = true => pending = currentTask
executed1.set(false);
String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id()));
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);

remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertFalse(executed1.get());
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1);

// Scenario3: pending = anotherTask, ongoing = true => pending = currentTask
AtomicBoolean executed2 = new AtomicBoolean(false);
Runnable task2 = () -> executed2.set(true);
RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1);
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);

remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertFalse(executed1.get());
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId
RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager(
new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex().resolve("lock_files")))
);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId, false);
}

private RemoteDirectory newRemoteDirectory(Path f) throws IOException {
Expand Down

0 comments on commit 5ba410b

Please sign in to comment.