diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 4f5f8d4b1ef5f..46e82a1f5ef0a 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -83,14 +83,15 @@ public interface BlobContainer { * Creates a new {@link FetchBlobResult} for the given blob name. * * @param blobName - * The name of the blob to get an {@link InputStream} for. + * The name of the blob to get an {@link FetchBlobResult} for. * @return The {@link FetchBlobResult} of the blob. * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ @ExperimentalApi default FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { - throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet"); + InputStream inputStream = readBlob(blobName); + return new FetchBlobResult(inputStream, null); }; /** diff --git a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java index 55aca771b586c..4e8ea4c6b089f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java +++ b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java @@ -10,6 +10,8 @@ import org.opensearch.common.annotation.ExperimentalApi; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -20,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class FetchBlobResult { +public class FetchBlobResult implements Closeable { /** * Downloaded blob InputStream @@ -45,4 +47,10 @@ public FetchBlobResult(InputStream inputStream, Map metadata) { this.metadata = metadata; } + @Override + public void close() throws IOException { + if (inputStream != null) { + inputStream.close(); + } + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index bec2d78d9af62..9a9d4cdec45ee 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -32,10 +32,12 @@ import java.io.InputStream; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; @@ -98,12 +100,30 @@ public void uploadBlobs( if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); } else { + logger.info("uploading file = {}", fileSnapshot.getName()); uploadBlob(fileSnapshot, listener, blobPath, writePriority); } }); } + private Map prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException { + if (!(fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot)) { + return null; + } + + FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot; + String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString(); + Long checkpointChecksum = tlogFileSnapshot.getCheckpointChecksum(); + + assert checkpointChecksum != null: "checksum can not be null"; + + Map metadata = new HashMap<>(); + metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString); + metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString()); + return metadata; + } + private void uploadBlob( TransferFileSnapshot fileSnapshot, ActionListener listener, @@ -111,7 +131,16 @@ private void uploadBlob( WritePriority writePriority ) { + if (fileSnapshot instanceof FileSnapshot.CheckpointFileSnapshot) { + logger.info("Skip uploading checkpoint file as this file = {} is stored as metadata of translog file", fileSnapshot.getName()); + listener.onResponse(fileSnapshot); + return; + } + try { + + Map metadata = prepareFileMetadata(fileSnapshot); + ChannelFactory channelFactory = FileChannel::open; long contentLength; try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { @@ -130,7 +159,8 @@ private void uploadBlob( writePriority, (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled + remoteIntegrityEnabled, + metadata ); ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..ee8ccbb9d4eaa 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -19,9 +19,11 @@ import java.io.InputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Base64; import java.util.Objects; /** @@ -153,16 +155,50 @@ public boolean equals(Object o) { public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; + private Path checkpointFilePath; + private Long checkpointChecksum; + public final static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; + public final static String CHECKPOINT_FILE_CHECKSUM_KEY = "ckp-checksum"; public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { super(path, primaryTerm, checksum); this.generation = generation; } + public void setCheckpointFilePath(Path checkpointFilePath) { + this.checkpointFilePath = checkpointFilePath; + } + + public void setCheckpointChecksum(Long checkpointChecksum) { + this.checkpointChecksum = checkpointChecksum; + } + + public String provideCheckpointDataAsString() throws IOException { + return buildCheckpointDataAsBase64String(checkpointFilePath); + } + + static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException { + long fileSize = Files.size(checkpointFilePath); + assert fileSize < 1500 : "checkpoint file size is more then 1.5KB size, can't be stored as metadata"; + byte[] fileBytes = Files.readAllBytes(checkpointFilePath); + return Base64.getEncoder().encodeToString(fileBytes); + } + + public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) { + if (base64CheckpointString == null) { + return null; + } + return Base64.getDecoder().decode(base64CheckpointString); + } + public long getGeneration() { return generation; } + public Long getCheckpointChecksum() { + return checkpointChecksum; + } + @Override public int hashCode() { return Objects.hash(generation, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index fb78731246a07..5ed72b64f1c23 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -51,6 +51,10 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo } private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + // set checkpoint file path and checkpoint file checksum for a translog file + translogFileSnapshot.setCheckpointFilePath(checkPointFileSnapshot.getPath()); + translogFileSnapshot.setCheckpointChecksum(checkPointFileSnapshot.getChecksum()); + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..14fb190878f7c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -16,6 +16,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -27,6 +28,7 @@ import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogCheckedContainer; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.threadpool.ThreadPool; @@ -47,6 +49,9 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes; /** * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} @@ -236,16 +241,58 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca generation, location ); - // Download Checkpoint file from remote to local FS - String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS + + // Download translog file with object metadata from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + downloadTranslogFileToFS(translogFilename, location, primaryTerm, generation); return true; } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + private void downloadTranslogFileToFS(String fileName, Path location, String primaryTerm, String generation) throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + if (Files.exists(filePath)) { + Files.delete(filePath); + } + + boolean downloadStatus = false; + long bytesToRead = 0, downloadStartTime = System.nanoTime(); + Map metadata; + + try (FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName)) { + InputStream inputStream = fetchBlobResult.getInputStream(); + metadata = fetchBlobResult.getMetadata(); + + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + + logger.info("downloaded translog for fileName = {}, with metadata = {}", fileName, metadata); + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(fileName, true); + + try { + if (metadata == null || metadata.isEmpty()) { + logger.info("metadata is null. Download checkpoint file from remote store separately"); + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + downloadCheckpointFileToFS(ckpFileName, location, primaryTerm); + } else { + writeCheckpointFileFromMetadata(metadata, location, generation, fileName); + } + } catch (Exception e) { + throw new IOException("Failed to download translog file from remote", e); + } + } + + private void downloadCheckpointFileToFS(String fileName, Path location, String primaryTerm) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download @@ -271,6 +318,69 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th fileTransferTracker.add(fileName, true); } + private void writeCheckpointFileFromMetadata(Map metadata, Path location, String generation, String fileName) + throws IOException { + + try { + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + Path filePath = location.resolve(ckpFileName); + + // Here, we always override the existing file if present. + if (Files.exists(filePath)) { + Files.delete(filePath); + } + + String ckpDataBase64 = metadata.get(CHECKPOINT_FILE_DATA_KEY); + String checksumKeyValue = metadata.get(CHECKPOINT_FILE_CHECKSUM_KEY); + if (ckpDataBase64 == null) { + throw new IllegalStateException("Checkpoint file data (ckp-data) key is expected but not found in metadata for file: " + fileName); + } + if (checksumKeyValue == null) { + throw new IllegalStateException("Checkpoint file checksum (ckp-checksum) key is expected but not found in metadata for file: " + fileName); + } + + byte[] ckpFileBytes = convertBase64StringToCheckpointFileDataBytes(ckpDataBase64); + Long remoteDataChecksum = Long.parseLong(checksumKeyValue); + + TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer(ckpFileBytes); + Long currentDataChecksum = translogCheckedContainer.getChecksum(); + + if (currentDataChecksum.equals(remoteDataChecksum)) { + logger.debug( + "Checksum verification successful. currentDataChecksum={}, remoteDataChecksum={}", + currentDataChecksum, + remoteDataChecksum + ); + } else { + logger.warn( + "Checksum verification failed. currentDataChecksum={}, remoteDataChecksum={}", + currentDataChecksum, + remoteDataChecksum + ); + throw new RuntimeException( + "Checksum verification failed for file: " + + fileName + + ". currentDataChecksum=" + + currentDataChecksum + + ", remoteChecksum=" + + remoteDataChecksum + ); + } + + Files.write(filePath, ckpFileBytes); + + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(ckpFileName, true); + logger.info("Wrote checkpoint file for fileName: {}", fileName); + } catch (IOException e) { + logger.error("Error writing checkpoint file for file: {}", fileName, e); + throw e; + } catch (IllegalStateException e) { + logger.error("Error processing metadata for file: {}", fileName, e); + throw e; + } + } + public TranslogTransferMetadata readMetadata() throws IOException { SetOnce metadataSetOnce = new SetOnce<>(); SetOnce exceptionSetOnce = new SetOnce<>(); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java index 2d75851e888a5..921d3f5763399 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java @@ -8,12 +8,16 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.index.translog.Translog; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Base64; public class FileSnapshotTests extends OpenSearchTestCase { @@ -69,6 +73,63 @@ public void testFileSnapshotContent() throws IOException { } } + public void testBuildCheckpointDataAsBase64String() throws IOException { + Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); + Files.writeString(file, "hello_world_with_checkpoint_file_data"); + Files.writeString(file, "hello_world_with_checkpoint_file_data-2"); + Files.writeString(file, "hello_world_with_checkpoint_file_data-4"); + Files.writeString(file, "213123123"); + + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); + + assertFileSnapshotProperties(file); + String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file); + + // Assert + assertNotNull(encodedString); + byte[] decoded = Base64.getDecoder().decode(encodedString); + assertArrayEquals(Files.readAllBytes(file), decoded); + } + + public void testBuildCheckpointDataAsBase64StringWhenPathIsNull() throws IOException { + Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); + Files.writeString(file, "hello_world_with_checkpoint_file_data"); + + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); + + assertFileSnapshotProperties(file); + + assertThrows(NullPointerException.class, () -> FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(null)); + } + + public void testConvertCheckpointBase64StringToBytes() throws IOException { + Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); + Files.writeString(file, "test-hello_world_with_checkpoint_file_data"); + + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); + + assertFileSnapshotProperties(file); + String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file); + + byte[] decodedBytes = FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes(encodedString); + assertNotNull(encodedString); + assertArrayEquals("test-hello_world_with_checkpoint_file_data".getBytes(), decodedBytes); + } + + public void testBuildCheckpointDataAsBase64String_whenFileSizeGreaterThan2KB_shouldThrowAssertionError() throws IOException { + Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); + byte[] data = new byte[2048]; // 2KB + + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); + + assertFileSnapshotProperties(file); + + ByteBuffer buffer = ByteBuffer.wrap(data); + Files.write(file, buffer.array(), StandardOpenOption.WRITE); + + assertThrows(AssertionError.class, () -> FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file)); + } + private void assertFileSnapshotProperties(Path file) throws IOException { assertEquals(file.getFileName().toString(), fileSnapshot.getName()); assertEquals(Files.size(file), fileSnapshot.getContentLength()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 81ae479d018b0..7a4f8f9b769e9 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -11,10 +11,7 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.SetOnce; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.*; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; @@ -117,6 +114,16 @@ public void setUp() throws Exception { Thread.sleep(delayForBlobDownload); return new ByteArrayInputStream(ckpBytes); }); + + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new FetchBlobResult(new ByteArrayInputStream(tlogBytes), null); + }); + + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.ckp"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new FetchBlobResult(new ByteArrayInputStream(ckpBytes), null); + }); } @Override @@ -460,7 +467,7 @@ public void testDownloadTranslogAlreadyExists() throws IOException { translogTransferManager.downloadTranslog("12", "23", location); - verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); @@ -475,7 +482,7 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { translogTransferManager.downloadTranslog("12", "23", location); - verify(transferService).downloadBlob(any(BlobPath.class), eq(translogFile)); + verify(transferService).downloadBlobWithMetadata(any(BlobPath.class), eq(translogFile)); verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); assertTrue(Files.exists(location.resolve(translogFile))); assertTrue(Files.exists(location.resolve(checkpointFile)));