Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Handle null partSize in OnDemandBlockSnapshotIndexInput (#9470) #12485

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.store.remote.file;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
Expand All @@ -26,8 +24,6 @@
* @opensearch.internal
*/
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);

/**
* Where this class fetches IndexInput parts from
*/
Expand All @@ -48,7 +44,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
protected final String fileName;

/**
* part size in bytes
* Maximum size in bytes of snapshot file parts.
*/
protected final long partSize;

Expand Down Expand Up @@ -104,7 +100,15 @@ public OnDemandBlockSnapshotIndexInput(
super(builder);
this.transferManager = transferManager;
this.fileInfo = fileInfo;
this.partSize = fileInfo.partSize().getBytes();
if (fileInfo.partSize() != null) {
this.partSize = fileInfo.partSize().getBytes();
} else {
// Repository implementations can define a size at which to split files
// into multiple objects in the repository. If partSize() is null, then
// no splitting happens, so default to Long.MAX_VALUE here to have the
// same effect. See {@code BlobStoreRepository#chunkSize()}.
this.partSize = Long.MAX_VALUE;
}
this.fileName = fileInfo.physicalName();
this.directory = directory;
this.originalFileSize = fileInfo.length();
Expand All @@ -131,6 +135,10 @@ protected IndexInput fetchBlock(int blockId) throws IOException {

final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);

// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
final int part = (int) (blockStart / partSize);
final long partStart = part * partSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Version;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
Expand All @@ -31,9 +33,12 @@
import java.io.IOException;
import java.nio.file.Path;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
Expand All @@ -43,7 +48,6 @@ public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
private static final String FILE_NAME = "File_Name";
private static final String BLOCK_FILE_PREFIX = FILE_NAME;
private static final boolean IS_CLONE = false;
private static final ByteSizeValue BYTE_SIZE_VALUE = new ByteSizeValue(1L);
private static final int FILE_SIZE = 29360128;
private TransferManager transferManager;
private LockFactory lockFactory;
Expand Down Expand Up @@ -74,7 +78,38 @@ public void test4MBBlock() throws Exception {
runAllTestsFor(22);
}

public void runAllTestsFor(int blockSizeShift) throws Exception {
public void testChunkedRepository() throws IOException {
final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes();
final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes();
final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes();

when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize]));
try (
FSDirectory directory = new MMapDirectory(path, lockFactory);
IndexInput indexInput = new OnDemandBlockSnapshotIndexInput(
OnDemandBlockIndexInput.builder()
.resourceDescription(RESOURCE_DESCRIPTION)
.offset(BLOCK_SNAPSHOT_FILE_OFFSET)
.length(FILE_SIZE)
.blockSizeShift((int) (Math.log(blockSize) / Math.log(2)))
.isClone(IS_CLONE),
new BlobStoreIndexShardSnapshot.FileInfo(
FILE_NAME,
new StoreFileMetadata(FILE_NAME, fileSize, "", Version.LATEST),
new ByteSizeValue(repositoryChunkSize)
),
directory,
transferManager
)
) {
// Seek to the position past the first repository chunk
indexInput.seek(repositoryChunkSize);
}
// Verify the second chunk is requested (i.e. ".part1")
verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1")));
}

private void runAllTestsFor(int blockSizeShift) throws Exception {
final OnDemandBlockSnapshotIndexInput blockedSnapshotFile = createOnDemandBlockSnapshotIndexInput(blockSizeShift);
final int blockSize = 1 << blockSizeShift;
TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE);
Expand Down Expand Up @@ -106,7 +141,7 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in
fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
FILE_NAME,
new StoreFileMetadata(FILE_NAME, FILE_SIZE, "", Version.LATEST),
BYTE_SIZE_VALUE
null
);

int blockSize = 1 << blockSizeShift;
Expand Down Expand Up @@ -182,7 +217,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) {

}

public static class TestGroup {
private static class TestGroup {

public static void testGetBlock(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize, int fileSize) {
// block 0
Expand Down
Loading