Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Searchable Snapshot] Fix bug of Searchable Snapshot Dependency on repository chunk_size #12277

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ protected Settings.Builder randomRepositorySettings() {
return settings;
}

private Settings.Builder chunkedRepositorySettings() {
private Settings.Builder chunkedRepositorySettings(long chunkSize) {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES);
settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES);
return settings;
}

Expand Down Expand Up @@ -194,18 +194,44 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithChunks() throws Exception {
public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings();
Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
createRepositoryWithSettings(repositorySettings, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);

deleteIndicesAndEnsureGreen(client, indexName);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

assertDocCount(restoredIndexName, 1000L);
}

/**
* Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithSmallChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings(1000);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
Expand Down Expand Up @@ -136,25 +138,45 @@
final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);

// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
final int part = (int) (blockStart / partSize);
final long partStart = part * partSize;

final long position = blockStart - partStart;
final long length = blockEnd - blockStart;

// Block may be present on multiple chunks of a file, so we need
// to fetch each chunk/blob part separately to fetch an entire block.
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.position(position)
.length(length)
.blobName(fileInfo.partName(part))
.blobParts(getBlobParts(blockStart, blockEnd))
.directory(directory)
.fileName(blockFileName)
.build();
return transferManager.fetchBlob(blobFetchRequest);
}

/**
* Returns list of blob parts/chunks in a file for a given block.
*/
protected List<BlobFetchRequest.BlobPart> getBlobParts(long blockStart, long blockEnd) {
// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
int partNum = (int) (blockStart / partSize);
long pos = blockStart;
long diff = (blockEnd - blockStart);

List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
while (diff > 0) {
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
long partStart = pos % partSize;
long partEnd;
if ((partStart + diff) > partSize) {
partEnd = partSize;

Check warning on line 167 in server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java#L167

Added line #L167 was not covered by tests
} else {
partEnd = (partStart + diff);
}
long fetchBytes = partEnd - partStart;
blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes));
partNum++;
pos = pos + fetchBytes;
diff = (blockEnd - pos);
}
return blobParts;
}

@Override
public OnDemandBlockSnapshotIndexInput clone() {
OnDemandBlockSnapshotIndexInput clone = buildSlice("clone", 0L, this.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.store.FSDirectory;

import java.nio.file.Path;
import java.util.List;

/**
* The specification to fetch specific block from blob store
Expand All @@ -20,37 +21,22 @@
*/
public class BlobFetchRequest {

private final long position;

private final long length;

private final String blobName;

private final Path filePath;

private final Directory directory;

private final String fileName;

private final List<BlobPart> blobParts;

private final long blobLength;

private BlobFetchRequest(Builder builder) {
this.position = builder.position;
this.length = builder.length;
this.blobName = builder.blobName;
this.fileName = builder.fileName;
this.filePath = builder.directory.getDirectory().resolve(fileName);
this.directory = builder.directory;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}

public String getBlobName() {
return blobName;
this.blobParts = builder.blobParts;
this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum();
}

public Path getFilePath() {
Expand All @@ -65,19 +51,23 @@
return fileName;
}

public List<BlobPart> blobParts() {
return blobParts;
}

public long getBlobLength() {
return blobLength;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "BlobFetchRequest{"
+ "position="
+ position
+ ", length="
+ length
+ ", blobName='"
+ blobName
+ "blobParts="
+ blobParts
+ '\''
+ ", filePath="
+ filePath
Expand All @@ -90,35 +80,45 @@
}

/**
* Builder for BlobFetchRequest
* BlobPart represents a single chunk of a file
*/
public static final class Builder {
public static class BlobPart {
private String blobName;
private long position;
private long length;
private String blobName;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder position(long position) {
this.position = position;
return this;
}

public Builder length(long length) {
public BlobPart(String blobName, long position, long length) {
this.blobName = blobName;
if (length <= 0) {
throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative");
throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative");

Check warning on line 93 in server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java#L93

Added line #L93 was not covered by tests
}
this.length = length;
return this;
this.position = position;
}

public Builder blobName(String blobName) {
this.blobName = blobName;
return this;
public String getBlobName() {
return blobName;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}
}

/**
* Builder for BlobFetchRequest
*/
public static final class Builder {
private List<BlobPart> blobParts;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder directory(FSDirectory directory) {
this.directory = directory;
return this;
Expand All @@ -129,6 +129,11 @@
return this;
}

public Builder blobParts(List<BlobPart> blobParts) {
this.blobParts = blobParts;
return this;
}

public BlobFetchRequest build() {
return new BlobFetchRequest(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa
}

/**
* Given a blobFetchRequest, return it's corresponding IndexInput.
* Given a blobFetchRequestList, return it's corresponding IndexInput.
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {

final Path key = blobFetchRequest.getFilePath();

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
Expand Down Expand Up @@ -85,15 +86,20 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo
try {
if (Files.exists(request.getFilePath()) == false) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
request.getBlobName(),
request.getPosition(),
request.getLength()
);
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
Expand Down Expand Up @@ -153,7 +159,7 @@ public IndexInput getIndexInput() throws IOException {

@Override
public long length() {
return request.getLength();
return request.getBlobLength();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testChunkedRepository() throws IOException {
indexInput.seek(repositoryChunkSize);
}
// Verify the second chunk is requested (i.e. ".part1")
verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1")));
verify(transferManager).fetchBlob(argThat(request -> request.blobParts().get(0).getBlobName().equals("File_Name.part1")));
}

private void runAllTestsFor(int blockSizeShift) throws Exception {
andrross marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -115,6 +115,7 @@ private void runAllTestsFor(int blockSizeShift) throws Exception {
TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE);
TestGroup.testGetBlockOffset(blockedSnapshotFile, blockSize, FILE_SIZE);
TestGroup.testGetBlockStart(blockedSnapshotFile, blockSize);
TestGroup.testGetBlobParts(blockedSnapshotFile);
TestGroup.testCurrentBlockStart(blockedSnapshotFile, blockSize);
TestGroup.testCurrentBlockPosition(blockedSnapshotFile, blockSize);
TestGroup.testClone(blockedSnapshotFile, blockSize);
Expand Down Expand Up @@ -252,6 +253,35 @@ public static void testGetBlockStart(OnDemandBlockSnapshotIndexInput blockedSnap
assertEquals(blockSize * 2, blockedSnapshotFile.getBlockStart(2));
}

public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile) {
// block id 0
int blockId = 0;
long blockStart = blockedSnapshotFile.getBlockStart(blockId);
long blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);

// block 1
blockId = 1;
blockStart = blockedSnapshotFile.getBlockStart(blockId);
blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);

// block 2
blockId = 2;
blockStart = blockedSnapshotFile.getBlockStart(blockId);
blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId);
assertEquals(
(blockEnd - blockStart),
blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()
);
}

public static void testCurrentBlockStart(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException {
// block 0
blockedSnapshotFile.seek(blockSize - 1);
Expand Down
Loading
Loading