diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 352e7dc2cc0e6..489c81f802695 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -12,9 +12,13 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.VersionedCodecStreamWrapper; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -61,6 +65,12 @@ public class TranslogTransferManager { private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( + new TranslogTransferMetadataHandler(), + TranslogTransferMetadata.CURRENT_VERSION, + TranslogTransferMetadata.METADATA_CODEC + ); + public TranslogTransferManager( ShardId shardId, TransferService transferService, @@ -174,9 +184,9 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th public TranslogTransferMetadata readMetadata() throws IOException { return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> { - try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) { + try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); - return new TranslogTransferMetadata(indexInput); + return metadataStreamWrapper.readStream(indexInput); } catch (IOException e) { logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); return null; @@ -197,13 +207,40 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + return new TransferFileSnapshot( getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), - translogTransferMetadata.createMetadataBytes(), + getMetadataBytes(translogTransferMetadata), translogTransferMetadata.getPrimaryTerm() ); } + /** + * Get the metadata bytes for a {@link TranslogTransferMetadata} object + * + * @param metadata The object to be parsed + * @return Byte representation for the given metadata + */ + public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { + byte[] metadataBytes; + + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata " + metadata.getPrimaryTerm(), + getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), + output, + TranslogTransferMetadata.BUFFER_SIZE + ) + ) { + metadataStreamWrapper.writeStream(indexOutput, metadata); + } + metadataBytes = BytesReference.toBytes(output.bytes()); + } + + return metadataBytes; + } + /** * This method handles deletion of multiple generations for a single primary term. The deletion happens for translog * and metadata files. diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 9ce0faaa3352d..7a2fee9a69d5e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -8,18 +8,10 @@ package org.opensearch.index.translog.transfer; -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.DataOutput; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.common.SetOnce; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; -import java.io.IOException; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -44,11 +36,11 @@ public class TranslogTransferMetadata { public static final String METADATA_SEPARATOR = "__"; - private static final int BUFFER_SIZE = 4096; + static final int BUFFER_SIZE = 4096; - private static final int CURRENT_VERSION = 1; + static final int CURRENT_VERSION = 1; - private static final String METADATA_CODEC = "md"; + static final String METADATA_CODEC = "md"; public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); @@ -59,15 +51,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans this.count = count; } - public TranslogTransferMetadata(IndexInput indexInput) throws IOException { - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION); - this.primaryTerm = indexInput.readLong(); - this.generation = indexInput.readLong(); - this.minTranslogGeneration = indexInput.readLong(); - this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings()); - } - public long getPrimaryTerm() { return primaryTerm; } @@ -96,24 +79,6 @@ public static String getFileName(long primaryTerm, long generation) { return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation))); } - public byte[] createMetadataBytes() throws IOException { - try (BytesStreamOutput output = new BytesStreamOutput()) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "translog transfer metadata " + primaryTerm, - getFileName(primaryTerm, generation), - output, - BUFFER_SIZE - ) - ) { - CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION); - write(indexOutput); - CodecUtil.writeFooter(indexOutput); - } - return BytesReference.toBytes(output.bytes()); - } - } - @Override public int hashCode() { return Objects.hash(primaryTerm, generation); @@ -127,17 +92,6 @@ public boolean equals(Object o) { return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation); } - private void write(DataOutput out) throws IOException { - out.writeLong(primaryTerm); - out.writeLong(generation); - out.writeLong(minTranslogGeneration); - if (generationToPrimaryTermMapper.get() != null) { - out.writeMapOfStrings(generationToPrimaryTermMapper.get()); - } else { - out.writeMapOfStrings(new HashMap<>()); - } - } - private static class MetadataFilenameComparator implements Comparator { @Override public int compare(String first, String second) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java new file mode 100644 index 0000000000000..cea7ef8a4e6dd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.io.IndexIOStreamHandler; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Handler for {@link TranslogTransferMetadata} + * + * @opensearch.internal + */ +public class TranslogTransferMetadataHandler implements IndexIOStreamHandler { + + /** + * Implements logic to read content from file input stream {@code indexInput} and parse into {@link TranslogTransferMetadata} + * + * @param indexInput file input stream + * @return content parsed to {@link TranslogTransferMetadata} + */ + @Override + public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOException { + long primaryTerm = indexInput.readLong(); + long generation = indexInput.readLong(); + long minTranslogGeneration = indexInput.readLong(); + Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + + return metadata; + } + + /** + * Implements logic to write content from {@code content} to file output stream {@code indexOutput} + * + * @param indexOutput file input stream + * @param content metadata content to be written + */ + @Override + public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata content) throws IOException { + indexOutput.writeLong(content.getPrimaryTerm()); + indexOutput.writeLong(content.getGeneration()); + indexOutput.writeLong(content.getMinTranslogGeneration()); + if (content.getGenerationToPrimaryTermMapper() != null) { + indexOutput.writeMapOfStrings(content.getGenerationToPrimaryTermMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 1c485dbc35c63..6f6b3622295b6 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -204,7 +204,7 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn( - new ByteArrayInputStream(metadata.createMetadataBytes()) + new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); @@ -222,7 +222,7 @@ public void testReadMetadataMultipleFiles() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn( - new ByteArrayInputStream(metadata.createMetadataBytes()) + new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java new file mode 100644 index 0000000000000..ccedd4a711433 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.junit.Before; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TranslogTransferMetadataHandlerTests extends OpenSearchTestCase { + private TranslogTransferMetadataHandler handler; + + @Before + public void setUp() throws Exception { + super.setUp(); + handler = new TranslogTransferMetadataHandler(); + } + + public void testReadContent() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadata(); + + // Operation: Read expected metadata from source input stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes()); + TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); + + // Verification: Compare actual metadata read from the source input stream. + assertEquals(expectedMetadata, actualMetadata); + } + + public void testWriteContent() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadata(); + + // Operation: Write expected metadata to the target output stream. + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + handler.writeContent(actualMetadataStream, expectedMetadata); + actualMetadataStream.close(); + + // Verification: Compare actual metadata written to the target output stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", BytesReference.toBytes(output.bytes())); + long primaryTerm = indexInput.readLong(); + long generation = indexInput.readLong(); + long minTranslogGeneration = indexInput.readLong(); + Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + assertEquals(expectedMetadata, actualMetadata); + } + + private TranslogTransferMetadata getTestMetadata() { + long primaryTerm = 3; + long generation = 500; + long minTranslogGeneration = 300; + Map generationToPrimaryTermMapper = new HashMap<>(); + generationToPrimaryTermMapper.put("300", "1"); + generationToPrimaryTermMapper.put("400", "2"); + generationToPrimaryTermMapper.put("500", "3"); + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + + return metadata; + } + + private byte[] getTestMetadataBytes() throws IOException { + TranslogTransferMetadata metadata = getTestMetadata(); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + Map generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper(); + indexOutput.writeMapOfStrings(generationToPrimaryTermMapper); + indexOutput.close(); + + return BytesReference.toBytes(output.bytes()); + } +}