diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index b3f8ee9c1817e..7166e9aa482e3 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -8,8 +8,6 @@ package org.opensearch.index.store.remote.file; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IndexInput; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -26,8 +24,6 @@ * @opensearch.internal */ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { - private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class); - /** * Where this class fetches IndexInput parts from */ @@ -48,7 +44,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { protected final String fileName; /** - * part size in bytes + * Maximum size in bytes of snapshot file parts. */ protected final long partSize; @@ -104,7 +100,15 @@ public OnDemandBlockSnapshotIndexInput( super(builder); this.transferManager = transferManager; this.fileInfo = fileInfo; - this.partSize = fileInfo.partSize().getBytes(); + if (fileInfo.partSize() != null) { + this.partSize = fileInfo.partSize().getBytes(); + } else { + // Repository implementations can define a size at which to split files + // into multiple objects in the repository. If partSize() is null, then + // no splitting happens, so default to Long.MAX_VALUE here to have the + // same effect. See {@code BlobStoreRepository#chunkSize()}. + this.partSize = Long.MAX_VALUE; + } this.fileName = fileInfo.physicalName(); this.directory = directory; this.originalFileSize = fileInfo.length(); @@ -131,6 +135,10 @@ protected IndexInput fetchBlock(int blockId) throws IOException { final long blockStart = getBlockStart(blockId); final long blockEnd = blockStart + getActualBlockSize(blockId); + + // If the snapshot file is chunked, we must account for this by + // choosing the appropriate file part and updating the position + // accordingly. final int part = (int) (blockStart / partSize); final long partStart = part * partSize; diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index a04feea3bb8e5..2204124f1de4f 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -19,6 +19,8 @@ import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.util.Constants; import org.apache.lucene.util.Version; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -31,9 +33,12 @@ import java.io.IOException; import java.nio.file.Path; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase { @@ -43,7 +48,6 @@ public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase { private static final String FILE_NAME = "File_Name"; private static final String BLOCK_FILE_PREFIX = FILE_NAME; private static final boolean IS_CLONE = false; - private static final ByteSizeValue BYTE_SIZE_VALUE = new ByteSizeValue(1L); private static final int FILE_SIZE = 29360128; private TransferManager transferManager; private LockFactory lockFactory; @@ -74,7 +78,38 @@ public void test4MBBlock() throws Exception { runAllTestsFor(22); } - public void runAllTestsFor(int blockSizeShift) throws Exception { + public void testChunkedRepository() throws IOException { + final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes(); + final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(); + final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes(); + + when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize])); + try ( + FSDirectory directory = new MMapDirectory(path, lockFactory); + IndexInput indexInput = new OnDemandBlockSnapshotIndexInput( + OnDemandBlockIndexInput.builder() + .resourceDescription(RESOURCE_DESCRIPTION) + .offset(BLOCK_SNAPSHOT_FILE_OFFSET) + .length(FILE_SIZE) + .blockSizeShift((int) (Math.log(blockSize) / Math.log(2))) + .isClone(IS_CLONE), + new BlobStoreIndexShardSnapshot.FileInfo( + FILE_NAME, + new StoreFileMetadata(FILE_NAME, fileSize, "", Version.LATEST), + new ByteSizeValue(repositoryChunkSize) + ), + directory, + transferManager + ) + ) { + // Seek to the position past the first repository chunk + indexInput.seek(repositoryChunkSize); + } + // Verify the second chunk is requested (i.e. ".part1") + verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1"))); + } + + private void runAllTestsFor(int blockSizeShift) throws Exception { final OnDemandBlockSnapshotIndexInput blockedSnapshotFile = createOnDemandBlockSnapshotIndexInput(blockSizeShift); final int blockSize = 1 << blockSizeShift; TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE); @@ -106,7 +141,7 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in fileInfo = new BlobStoreIndexShardSnapshot.FileInfo( FILE_NAME, new StoreFileMetadata(FILE_NAME, FILE_SIZE, "", Version.LATEST), - BYTE_SIZE_VALUE + null ); int blockSize = 1 << blockSizeShift; @@ -182,7 +217,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) { } - public static class TestGroup { + private static class TestGroup { public static void testGetBlock(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize, int fileSize) { // block 0