Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimize remote store operations during snapshot Deletion #12677

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,49 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
}

// Visible for testing
Set<String> getMetadataFilesToFilterActiveSegments(
final int lastNMetadataFilesToKeep,
final List<String> sortedMetadataFiles,
final Set<String> lockedMetadataFiles
) {
// the idea here is for each deletable md file, we can consider the segments present in non-deletable md file
// before this and non-deletable md file after this to compute the active segment files.
// For ex:
// lastNMetadataFilesToKeep = 3
// sortedMetadataFiles = [m1, m2, m3, m4, m5, m6(locked), m7(locked), m8(locked), m9(locked), m10]
// lockedMetadataFiles = m6, m7, m8, m9
// then the returned set will be (m3, m6, m9)
final Set<String> metadataFilesToFilterActiveSegments = new HashSet<>();
for (int idx = lastNMetadataFilesToKeep; idx < sortedMetadataFiles.size(); idx++) {
if (lockedMetadataFiles.contains(sortedMetadataFiles.get(idx)) == false) {
String prevMetadata = (idx - 1) >= 0 ? sortedMetadataFiles.get(idx - 1) : null;
String nextMetadata = (idx + 1) < sortedMetadataFiles.size() ? sortedMetadataFiles.get(idx + 1) : null;

if (prevMetadata != null && (lockedMetadataFiles.contains(prevMetadata) || idx == lastNMetadataFilesToKeep)) {
// if previous metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(prevMetadata);
}
if (nextMetadata != null && lockedMetadataFiles.contains(nextMetadata)) {
// if next metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(nextMetadata);
}
}
}
return metadataFilesToFilterActiveSegments;
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
* we just need to read the latest metadata file. All the stale metadata files can be safely deleted.
* we just need to read the latest metadata file.
* Assumptions:
* (1) if a segment file is not present in a md file, it will never be present in any md file created after that, and
* (2) if (md1, md2, md3) are in sorted order, it is not possible that a segment file will be in md1 and md3 but not in md2.
* <p>
* for each deletable md file, segments present in non-deletable md file before this and non-deletable md file
* after this are sufficient to compute the list of active or non-deletable segment files referenced by a deletable
* md file
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
Expand Down Expand Up @@ -760,7 +799,6 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
Expand All @@ -769,7 +807,14 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
for (String metadataFile : sortedMetadataFileList) {

final Set<String> metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments(
lastNMetadataFilesToKeep,
sortedMetadataFileList,
allLockFiles
);

for (String metadataFile : metadataFilesToFilterActiveSegments) {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
activeSegmentRemoteFilenames.addAll(
Expand Down Expand Up @@ -848,6 +893,25 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}
}

public static void remoteDirectoryCleanup(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
}

/*
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
Expand All @@ -870,7 +934,6 @@ private boolean deleteIfEmpty() throws IOException {
logger.error("Exception occurred while deleting directory", e);
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -237,6 +238,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class);

/**
* Setting to disable caching of the latest repository data.
*/
Expand Down Expand Up @@ -1161,6 +1164,78 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

public static void remoteDirectoryCleanupAsync(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
String threadPoolName
) {
threadpool.executor(threadPoolName)
.execute(
new RemoteStoreShardCleanupTask(
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(
remoteDirectoryFactory,
remoteStoreRepoForIndex,
indexUUID,
shardId
),
indexUUID,
shardId
)
);
}

protected void releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
BlobContainer shardContainer,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
) throws IOException {
if (remoteStoreLockManagerFactory == null) {
return;
}

RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
shallowSnapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID)) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
);
remoteDirectoryCleanupAsync(
remoteDirectoryFactory,
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
ThreadPool.Names.REMOTE_PURGE
);
}
}

// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
private void executeStaleShardDelete(
Expand All @@ -1172,53 +1247,34 @@ private void executeStaleShardDelete(
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
if (remoteStoreLockManagerFactory != null) {
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
List<String> eligibleFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
try {
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
eligibleFilesToDelete.add(fileToDelete);
} catch (Exception e) {
logger.error(
"Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}",
indexId,
shardId,
snapshotUUID
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
).close();
}
}
} else {
eligibleFilesToDelete.add(fileToDelete);
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), filesToDelete);
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1651,39 +1707,12 @@ private void executeOneStaleIndexDelete(
for (String blob : shardBlob.getValue().listBlobs().keySet()) {
final Optional<String> snapshotUUID = extractShallowSnapshotUUID(blob);
if (snapshotUUID.isPresent()) {
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID.get(),
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardBlob.getKey()
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build()
releaseRemoteStoreLockAndCleanup(
shardBlob.getKey(),
snapshotUUID.get(),
shardBlob.getValue(),
remoteStoreLockManagerFactory
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey()))
).close();
}
}
}
}
Expand Down
Loading
Loading