Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize remote store operations during snapshot Deletion #12319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,49 @@
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
}

// Visible for testing
Set<String> getMetadataFilesToFilterActiveSegments(
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
final int lastNMetadataFilesToKeep,
final List<String> sortedMetadataFiles,
final Set<String> lockedMetadataFiles
) {
// the idea here is for each deletable md file, we can consider the segments present in non-deletable md file
// before this and non-deletable md file after this to compute the active segment files.
// For ex:
// lastNMetadataFilesToKeep = 3
// sortedMetadataFiles = [m1, m2, m3, m4, m5, m6(locked), m7(locked), m8(locked), m9(locked), m10]
// lockedMetadataFiles = m6, m7, m8, m9
// then the returned set will be (m3, m6, m9)
final Set<String> metadataFilesToFilterActiveSegments = new HashSet<>();
for (int idx = lastNMetadataFilesToKeep; idx < sortedMetadataFiles.size(); idx++) {
if (lockedMetadataFiles.contains(sortedMetadataFiles.get(idx)) == false) {
String prevMetadata = (idx - 1) >= 0 ? sortedMetadataFiles.get(idx - 1) : null;
String nextMetadata = (idx + 1) < sortedMetadataFiles.size() ? sortedMetadataFiles.get(idx + 1) : null;

if (prevMetadata != null && (lockedMetadataFiles.contains(prevMetadata) || idx == lastNMetadataFilesToKeep)) {
// if previous metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(prevMetadata);
}
if (nextMetadata != null && lockedMetadataFiles.contains(nextMetadata)) {
// if next metadata of deletable md is locked, add it to md files for active segments.
metadataFilesToFilterActiveSegments.add(nextMetadata);
}
}
}
return metadataFilesToFilterActiveSegments;
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
* we just need to read the latest metadata file. All the stale metadata files can be safely deleted.
* we just need to read the latest metadata file.
* Assumptions:
* (1) if a segment file is not present in a md file, it will never be present in any md file created after that, and
* (2) if (md1, md2, md3) are in sorted order, it is not possible that a segment file will be in md1 and md3 but not in md2.
* <p>
* for each deletable md file, segments present in non-deletable md file before this and non-deletable md file
* after this are sufficient to compute the list of active or non-deletable segment files referenced by a deletable
* md file
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
Expand Down Expand Up @@ -760,7 +799,6 @@
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
Expand All @@ -769,7 +807,14 @@

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
for (String metadataFile : sortedMetadataFileList) {

final Set<String> metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments(
lastNMetadataFilesToKeep,
sortedMetadataFileList,
allLockFiles
);

for (String metadataFile : metadataFilesToFilterActiveSegments) {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
activeSegmentRemoteFilenames.addAll(
Expand Down Expand Up @@ -848,6 +893,25 @@
}
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
}

public static void remoteDirectoryCleanup(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);

Check warning on line 911 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L910-L911

Added lines #L910 - L911 were not covered by tests
}
}

/*
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
Expand All @@ -870,7 +934,6 @@
logger.error("Exception occurred while deleting directory", e);
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -236,6 +237,8 @@
Setting.Property.Deprecated
);

private static final Logger staticLogger = LogManager.getLogger(BlobStoreRepository.class);

/**
* Setting to disable caching of the latest repository data.
*/
Expand Down Expand Up @@ -1098,6 +1101,78 @@
}
}

public static void remoteDirectoryCleanupAsync(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
String threadPoolName
) {
threadpool.executor(threadPoolName)
.execute(

Check warning on line 1113 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1112-L1113

Added lines #L1112 - L1113 were not covered by tests
new RemoteStoreShardCleanupTask(
() -> RemoteSegmentStoreDirectory.remoteDirectoryCleanup(

Check warning on line 1115 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1115

Added line #L1115 was not covered by tests
remoteDirectoryFactory,
remoteStoreRepoForIndex,
indexUUID,
shardId
),
indexUUID,
shardId
)
);
}

Check warning on line 1125 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1125

Added line #L1125 was not covered by tests

protected void releaseRemoteStoreLockAndCleanup(
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
String shardId,
String shallowSnapshotUUID,
BlobContainer shardContainer,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
) throws IOException {
if (remoteStoreLockManagerFactory == null) {
return;

Check warning on line 1134 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1134

Added line #L1134 was not covered by tests
}

RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(

Check warning on line 1137 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1137

Added line #L1137 was not covered by tests
shardContainer,
shallowSnapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();

Check warning on line 1143 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1142-L1143

Added lines #L1142 - L1143 were not covered by tests
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(

Check warning on line 1147 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1147

Added line #L1147 was not covered by tests
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);

Check warning on line 1153 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1152-L1153

Added lines #L1152 - L1153 were not covered by tests
if (!isIndexPresent(clusterService, indexUUID)) {
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),

Check warning on line 1162 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1161-L1162

Added lines #L1161 - L1162 were not covered by tests
threadPool
);
remoteDirectoryCleanupAsync(

Check warning on line 1165 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1165

Added line #L1165 was not covered by tests
remoteDirectoryFactory,
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),

Check warning on line 1170 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1170

Added line #L1170 was not covered by tests
ThreadPool.Names.REMOTE_PURGE
);
}
}

Check warning on line 1174 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1174

Added line #L1174 was not covered by tests

// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
private void executeStaleShardDelete(
Expand All @@ -1109,53 +1184,34 @@
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
if (remoteStoreLockManagerFactory != null) {
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
List<String> eligibleFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));

Check warning on line 1197 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1192-L1197

Added lines #L1192 - L1197 were not covered by tests
try {
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
eligibleFilesToDelete.add(fileToDelete);
} catch (Exception e) {
logger.error(

Check warning on line 1202 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1199-L1202

Added lines #L1199 - L1202 were not covered by tests
"Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}",
indexId,
shardId,
snapshotUUID
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
).close();
}
}
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
} else {

Check warning on line 1209 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1209

Added line #L1209 was not covered by tests
eligibleFilesToDelete.add(fileToDelete);
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), filesToDelete);
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1588,39 +1644,12 @@
for (String blob : shardBlob.getValue().listBlobs().keySet()) {
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
final Optional<String> snapshotUUID = extractShallowSnapshotUUID(blob);
if (snapshotUUID.isPresent()) {
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID.get(),
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardBlob.getKey()
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build()
releaseRemoteStoreLockAndCleanup(
shardBlob.getKey(),
snapshotUUID.get(),
shardBlob.getValue(),

Check warning on line 1650 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1647-L1650

Added lines #L1647 - L1650 were not covered by tests
remoteStoreLockManagerFactory
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey()))
).close();
}
}
}
}
Expand Down
Loading
Loading