Skip to content

Commit

Permalink
Optimize remote store operations during snapshot Deletion (#12319)
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
(cherry picked from commit b265215)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 14, 2024
1 parent 537d679 commit a9e3779
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,49 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
}

// Visible for testing
Set<String> getMetadataFilesToFilterActiveSegments(
final int lastNMetadataFilesToKeep,
final List<String> sortedMetadataFiles,
final Set<String> lockedMetadataFiles
) {
// the idea here is for each deletable md file, we can consider the segments present in non-deletable md file
// before this and non-deletable md file after this to compute the active segment files.
// For ex:
// lastNMetadataFilesToKeep = 3
// sortedMetadataFiles = [m1, m2, m3, m4, m5, m6(locked), m7(locked), m8(locked), m9(locked), m10]
// lockedMetadataFiles = m6, m7, m8, m9
// then the returned set will be (m3, m6, m9)
final Set<String> metadataFilesToFilterActiveSegments = new HashSet<>();
for (int idx = lastNMetadataFilesToKeep; idx < sortedMetadataFiles.size(); idx++) {
if (lockedMetadataFiles.contains(sortedMetadataFiles.get(idx)) == false) {
String prevMetadata = (idx - 1) >= 0 ? sortedMetadataFiles.get(idx - 1) : null;
String nextMetadata = (idx + 1) < sortedMetadataFiles.size() ? sortedMetadataFiles.get(idx + 1) : null;

if (prevMetadata != null && (lockedMetadataFiles.contains(prevMetadata) || idx == lastNMetadataFilesToKeep)) {
// if previous metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(prevMetadata);
}
if (nextMetadata != null && lockedMetadataFiles.contains(nextMetadata)) {
// if next metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(nextMetadata);
}
}
}
return metadataFilesToFilterActiveSegments;
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
* we just need to read the latest metadata file. All the stale metadata files can be safely deleted.
* we just need to read the latest metadata file.
* Assumptions:
* (1) if a segment file is not present in a md file, it will never be present in any md file created after that, and
* (2) if (md1, md2, md3) are in sorted order, it is not possible that a segment file will be in md1 and md3 but not in md2.
* <p>
* for each deletable md file, segments present in non-deletable md file before this and non-deletable md file
* after this are sufficient to compute the list of active or non-deletable segment files referenced by a deletable
* md file
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
Expand Down Expand Up @@ -760,7 +799,6 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
Expand All @@ -769,7 +807,14 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
for (String metadataFile : sortedMetadataFileList) {

final Set<String> metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments(
lastNMetadataFilesToKeep,
sortedMetadataFileList,
allLockFiles
);

for (String metadataFile : metadataFilesToFilterActiveSegments) {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
activeSegmentRemoteFilenames.addAll(
Expand Down Expand Up @@ -848,6 +893,25 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}
}

public static void remoteDirectoryCleanup(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
}

/*
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
Expand All @@ -870,7 +934,6 @@ private boolean deleteIfEmpty() throws IOException {
logger.error("Exception occurred while deleting directory", e);
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -237,6 +238,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class);

/**
* Setting to disable caching of the latest repository data.
*/
Expand Down Expand Up @@ -1161,6 +1164,78 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

public static void remoteDirectoryCleanupAsync(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
String threadPoolName
) {
threadpool.executor(threadPoolName)
.execute(
new RemoteStoreShardCleanupTask(
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(
remoteDirectoryFactory,
remoteStoreRepoForIndex,
indexUUID,
shardId
),
indexUUID,
shardId
)
);
}

protected void releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
BlobContainer shardContainer,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
) throws IOException {
if (remoteStoreLockManagerFactory == null) {
return;
}

RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
shallowSnapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID)) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
);
remoteDirectoryCleanupAsync(
remoteDirectoryFactory,
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
ThreadPool.Names.REMOTE_PURGE
);
}
}

// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
private void executeStaleShardDelete(
Expand All @@ -1172,53 +1247,34 @@ private void executeStaleShardDelete(
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
if (remoteStoreLockManagerFactory != null) {
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
List<String> eligibleFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
try {
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
eligibleFilesToDelete.add(fileToDelete);
} catch (Exception e) {
logger.error(
"Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}",
indexId,
shardId,
snapshotUUID
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
).close();
}
}
} else {
eligibleFilesToDelete.add(fileToDelete);
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), filesToDelete);
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1651,39 +1707,12 @@ private void executeOneStaleIndexDelete(
for (String blob : shardBlob.getValue().listBlobs().keySet()) {
final Optional<String> snapshotUUID = extractShallowSnapshotUUID(blob);
if (snapshotUUID.isPresent()) {
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID.get(),
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardBlob.getKey()
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build()
releaseRemoteStoreLockAndCleanup(
shardBlob.getKey(),
snapshotUUID.get(),
shardBlob.getValue(),
remoteStoreLockManagerFactory
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey()))
).close();
}
}
}
}
Expand Down
Loading

0 comments on commit a9e3779

Please sign in to comment.