From 1c7eb831a0ec5b6b7e4983d4c0952e5a80230945 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Wed, 7 Jun 2023 18:30:12 +0530 Subject: [PATCH 01/10] Resolve #7228 Signed-off-by: Bhumika Saini --- .../transfer/TranslogTransferMetadata.java | 38 +++----- .../TranslogTransferMetadataHandler.java | 60 ++++++++++++ .../TranslogTransferMetadataHandlerTests.java | 92 +++++++++++++++++++ 3 files changed, 164 insertions(+), 26 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 9ce0faaa3352d..6053410fd713a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -8,18 +8,16 @@ package org.opensearch.index.translog.transfer; -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.common.SetOnce; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -32,11 +30,11 @@ */ public class TranslogTransferMetadata { - private final long primaryTerm; + private long primaryTerm; - private final long generation; + private long generation; - private final long minTranslogGeneration; + private long minTranslogGeneration; private int count; @@ -52,6 +50,12 @@ public class TranslogTransferMetadata { public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); + private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( + new TranslogTransferMetadataHandler(), + TranslogTransferMetadata.CURRENT_VERSION, + TranslogTransferMetadata.METADATA_CODEC + ); + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; @@ -60,12 +64,7 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans } public TranslogTransferMetadata(IndexInput indexInput) throws IOException { - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION); - this.primaryTerm = indexInput.readLong(); - this.generation = indexInput.readLong(); - this.minTranslogGeneration = indexInput.readLong(); - this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings()); + metadataStreamWrapper.readStream(indexInput); } public long getPrimaryTerm() { @@ -106,9 +105,7 @@ public byte[] createMetadataBytes() throws IOException { BUFFER_SIZE ) ) { - CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION); - write(indexOutput); - CodecUtil.writeFooter(indexOutput); + metadataStreamWrapper.writeStream(indexOutput, this); } return BytesReference.toBytes(output.bytes()); } @@ -127,17 +124,6 @@ public boolean equals(Object o) { return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation); } - private void write(DataOutput out) throws IOException { - out.writeLong(primaryTerm); - out.writeLong(generation); - out.writeLong(minTranslogGeneration); - if (generationToPrimaryTermMapper.get() != null) { - out.writeMapOfStrings(generationToPrimaryTermMapper.get()); - } else { - out.writeMapOfStrings(new HashMap<>()); - } - } - private static class MetadataFilenameComparator implements Comparator { @Override public int compare(String first, String second) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java new file mode 100644 index 0000000000000..176eed3a11f66 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.io.IndexIOStreamHandler; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TranslogTransferMetadataHandler implements IndexIOStreamHandler { + + /** + * Implements logic to read content from file input stream {@code indexInput} and parse into {@link TranslogTransferMetadata} + * + * @param indexInput file input stream + * @return content parsed to {@link TranslogTransferMetadata} + */ + @Override + public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOException { + long primaryTerm = indexInput.readLong(); + long generation = indexInput.readLong(); + long minTranslogGeneration = indexInput.readLong(); + Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + + return metadata; + } + + /** + * Implements logic to write content from {@code content} to file output stream {@code indexOutput} + * + * @param indexOutput file input stream + * @param content metadata content to be written + */ + @Override + public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata content) throws IOException { + indexOutput.writeLong(content.getPrimaryTerm()); + indexOutput.writeLong(content.getGeneration()); + indexOutput.writeLong(content.getMinTranslogGeneration()); + if (content.getGenerationToPrimaryTermMapper() != null) { + indexOutput.writeMapOfStrings(content.getGenerationToPrimaryTermMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } + + indexOutput.close(); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java new file mode 100644 index 0000000000000..77e13de772fd0 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.junit.Before; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TranslogTransferMetadataHandlerTests extends OpenSearchTestCase { + private TranslogTransferMetadataHandler handler; + + @Before + public void setUp() throws Exception { + super.setUp(); + handler = new TranslogTransferMetadataHandler(); + } + + public void testReadContent() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadata(); + + // Operation: Read expected metadata from source input stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes()); + TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); + + // Verification: Compare actual metadata read from the source input stream. + assertEquals(expectedMetadata, actualMetadata); + } + + public void testWriteContent() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadata(); + + // Operation: Write expected metadata to the target output stream. + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + handler.writeContent(actualMetadataStream, expectedMetadata); + + // Verification: Compare actual metadata written to the target output stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", BytesReference.toBytes(output.bytes())); + long primaryTerm = indexInput.readLong(); + long generation = indexInput.readLong(); + long minTranslogGeneration = indexInput.readLong(); + Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + assertEquals(expectedMetadata, actualMetadata); + } + + private TranslogTransferMetadata getTestMetadata() { + long primaryTerm = 3; + long generation = 500; + long minTranslogGeneration = 300; + Map generationToPrimaryTermMapper = new HashMap<>(); + generationToPrimaryTermMapper.put("300", "1"); + generationToPrimaryTermMapper.put("400", "2"); + generationToPrimaryTermMapper.put("500", "3"); + int count = generationToPrimaryTermMapper.size(); + TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); + metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + + return metadata; + } + + private byte[] getTestMetadataBytes() throws IOException { + TranslogTransferMetadata metadata = getTestMetadata(); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + Map generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper(); + indexOutput.writeMapOfStrings(generationToPrimaryTermMapper); + indexOutput.close(); + + return BytesReference.toBytes(output.bytes()); + } +} From 17ee559af7da20c724a9e7065b6961ae0a479058 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 8 Jun 2023 11:23:04 +0530 Subject: [PATCH 02/10] Move VersionedCodecStreamWrapper to TranslogTransferManager Signed-off-by: Bhumika Saini --- .../transfer/TranslogTransferManager.java | 32 ++++++++++++++-- .../transfer/TranslogTransferMetadata.java | 38 ++----------------- .../TranslogTransferManagerTests.java | 36 +++++++++++++++++- 3 files changed, 66 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 352e7dc2cc0e6..a57d5545661ad 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -12,9 +12,13 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.VersionedCodecStreamWrapper; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -61,6 +65,12 @@ public class TranslogTransferManager { private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( + new TranslogTransferMetadataHandler(), + TranslogTransferMetadata.CURRENT_VERSION, + TranslogTransferMetadata.METADATA_CODEC + ); + public TranslogTransferManager( ShardId shardId, TransferService transferService, @@ -174,9 +184,9 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th public TranslogTransferMetadata readMetadata() throws IOException { return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> { - try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) { + try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); - return new TranslogTransferMetadata(indexInput); + return metadataStreamWrapper.readStream(indexInput); } catch (IOException e) { logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); return null; @@ -197,9 +207,25 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + byte[] metadataBytes; + + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata " + translogTransferMetadata.getPrimaryTerm(), + getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + output, + TranslogTransferMetadata.BUFFER_SIZE + ) + ) { + metadataStreamWrapper.writeStream(indexOutput, translogTransferMetadata); + } + metadataBytes = BytesReference.toBytes(output.bytes()); + } + return new TransferFileSnapshot( getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), - translogTransferMetadata.createMetadataBytes(), + metadataBytes, translogTransferMetadata.getPrimaryTerm() ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 6053410fd713a..85204c1562fce 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -8,14 +8,8 @@ package org.opensearch.index.translog.transfer; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.common.SetOnce; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.VersionedCodecStreamWrapper; -import org.opensearch.common.io.stream.BytesStreamOutput; -import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.Map; @@ -42,20 +36,14 @@ public class TranslogTransferMetadata { public static final String METADATA_SEPARATOR = "__"; - private static final int BUFFER_SIZE = 4096; + public static final int BUFFER_SIZE = 4096; - private static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = 1; - private static final String METADATA_CODEC = "md"; + public static final String METADATA_CODEC = "md"; public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); - private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( - new TranslogTransferMetadataHandler(), - TranslogTransferMetadata.CURRENT_VERSION, - TranslogTransferMetadata.METADATA_CODEC - ); - public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; @@ -63,10 +51,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans this.count = count; } - public TranslogTransferMetadata(IndexInput indexInput) throws IOException { - metadataStreamWrapper.readStream(indexInput); - } - public long getPrimaryTerm() { return primaryTerm; } @@ -95,22 +79,6 @@ public static String getFileName(long primaryTerm, long generation) { return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation))); } - public byte[] createMetadataBytes() throws IOException { - try (BytesStreamOutput output = new BytesStreamOutput()) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "translog transfer metadata " + primaryTerm, - getFileName(primaryTerm, generation), - output, - BUFFER_SIZE - ) - ) { - metadataStreamWrapper.writeStream(indexOutput, this); - } - return BytesReference.toBytes(output.bytes()); - } - } - @Override public int hashCode() { return Objects.hash(primaryTerm, generation); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 1c485dbc35c63..b2b436d8c060e 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -8,12 +8,15 @@ package org.opensearch.index.translog.transfer; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -31,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -42,6 +46,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -204,7 +209,7 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn( - new ByteArrayInputStream(metadata.createMetadataBytes()) + new ByteArrayInputStream(getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); @@ -222,7 +227,7 @@ public void testReadMetadataMultipleFiles() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn( - new ByteArrayInputStream(metadata.createMetadataBytes()) + new ByteArrayInputStream(getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); @@ -381,4 +386,31 @@ public void testDeleteTranslogFailure() throws Exception { translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); assertEquals(2, tracker.allUploaded().size()); } + + private byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { + byte[] metadataBytes; + + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata " + metadata.getPrimaryTerm(), + getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), + output, + TranslogTransferMetadata.BUFFER_SIZE + ) + ) { + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + if (metadata.getGenerationToPrimaryTermMapper() != null) { + indexOutput.writeMapOfStrings(metadata.getGenerationToPrimaryTermMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } + } + metadataBytes = BytesReference.toBytes(output.bytes()); + } + + return metadataBytes; + } } From 664af67cd1ea3f4d2b4fc1c8eb32988f0a8f7b6e Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 8 Jun 2023 13:32:36 +0530 Subject: [PATCH 03/10] Fix failing tests Signed-off-by: Bhumika Saini --- .../transfer/TranslogTransferManager.java | 28 ++++++++++----- .../TranslogTransferMetadataHandler.java | 7 ++-- .../TranslogTransferManagerTests.java | 36 ++----------------- 3 files changed, 27 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index a57d5545661ad..0198cb6fef001 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -207,27 +207,39 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + + return new TransferFileSnapshot( + getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + getMetadataBytes(translogTransferMetadata), + translogTransferMetadata.getPrimaryTerm() + ); + } + + /** + * Get the metadata bytes for a {@link TranslogTransferMetadata} object + * + * @param metadata The object to be parsed + * @return Byte representation for the given metadata + * @throws IOException + */ + public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { byte[] metadataBytes; try (BytesStreamOutput output = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "translog transfer metadata " + translogTransferMetadata.getPrimaryTerm(), - getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + "translog transfer metadata " + metadata.getPrimaryTerm(), + getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), output, TranslogTransferMetadata.BUFFER_SIZE ) ) { - metadataStreamWrapper.writeStream(indexOutput, translogTransferMetadata); + metadataStreamWrapper.writeStream(indexOutput, metadata); } metadataBytes = BytesReference.toBytes(output.bytes()); } - return new TransferFileSnapshot( - getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), - metadataBytes, - translogTransferMetadata.getPrimaryTerm() - ); + return metadataBytes; } /** diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java index 176eed3a11f66..cea7ef8a4e6dd 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -16,6 +16,11 @@ import java.util.HashMap; import java.util.Map; +/** + * Handler for {@link TranslogTransferMetadata} + * + * @opensearch.internal + */ public class TranslogTransferMetadataHandler implements IndexIOStreamHandler { /** @@ -54,7 +59,5 @@ public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata conte } else { indexOutput.writeMapOfStrings(new HashMap<>()); } - - indexOutput.close(); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index b2b436d8c060e..6f6b3622295b6 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -8,15 +8,12 @@ package org.opensearch.index.translog.transfer; -import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -34,7 +31,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +42,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -209,7 +204,7 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn( - new ByteArrayInputStream(getMetadataBytes(metadata)) + new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); @@ -227,7 +222,7 @@ public void testReadMetadataMultipleFiles() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn( - new ByteArrayInputStream(getMetadataBytes(metadata)) + new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); @@ -386,31 +381,4 @@ public void testDeleteTranslogFailure() throws Exception { translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); assertEquals(2, tracker.allUploaded().size()); } - - private byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { - byte[] metadataBytes; - - try (BytesStreamOutput output = new BytesStreamOutput()) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "translog transfer metadata " + metadata.getPrimaryTerm(), - getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), - output, - TranslogTransferMetadata.BUFFER_SIZE - ) - ) { - indexOutput.writeLong(metadata.getPrimaryTerm()); - indexOutput.writeLong(metadata.getGeneration()); - indexOutput.writeLong(metadata.getMinTranslogGeneration()); - if (metadata.getGenerationToPrimaryTermMapper() != null) { - indexOutput.writeMapOfStrings(metadata.getGenerationToPrimaryTermMapper()); - } else { - indexOutput.writeMapOfStrings(new HashMap<>()); - } - } - metadataBytes = BytesReference.toBytes(output.bytes()); - } - - return metadataBytes; - } } From 86f627969de742e37d774c9ab587a2938401a18f Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 8 Jun 2023 16:03:53 +0530 Subject: [PATCH 04/10] Pass TranslogTransferMetadataHandlerTests#testWriteContent Signed-off-by: Bhumika Saini --- .../translog/transfer/TranslogTransferMetadataHandlerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java index 77e13de772fd0..ccedd4a711433 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -47,6 +47,7 @@ public void testWriteContent() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); handler.writeContent(actualMetadataStream, expectedMetadata); + actualMetadataStream.close(); // Verification: Compare actual metadata written to the target output stream. IndexInput indexInput = new ByteArrayIndexInput("metadata file", BytesReference.toBytes(output.bytes())); From 24ee5ca1bd4fcbb0b635f23b80ea8b371d4f0886 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 8 Jun 2023 16:52:57 +0530 Subject: [PATCH 05/10] Empty commit Signed-off-by: Bhumika Saini From 60c36e124efc6c2799ec95ff406d79e54c53e57c Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 8 Jun 2023 17:46:36 +0530 Subject: [PATCH 06/10] Revert obsolete TranslogTransferMetadata attribute updates Signed-off-by: Bhumika Saini --- .../index/translog/transfer/TranslogTransferMetadata.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 85204c1562fce..a50b59768cdcb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -24,11 +24,11 @@ */ public class TranslogTransferMetadata { - private long primaryTerm; + private final long primaryTerm; - private long generation; + private final long generation; - private long minTranslogGeneration; + private final long minTranslogGeneration; private int count; From bcbebaa2e12f5080ac9281b623fbc4bef7188421 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Mon, 12 Jun 2023 15:34:13 +0530 Subject: [PATCH 07/10] Make BUFFER_SIZE, CURRENT_VERSION, METADATA_CODEC package-private Signed-off-by: Bhumika Saini --- .../index/translog/transfer/TranslogTransferMetadata.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index a50b59768cdcb..7a2fee9a69d5e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -36,11 +36,11 @@ public class TranslogTransferMetadata { public static final String METADATA_SEPARATOR = "__"; - public static final int BUFFER_SIZE = 4096; + static final int BUFFER_SIZE = 4096; - public static final int CURRENT_VERSION = 1; + static final int CURRENT_VERSION = 1; - public static final String METADATA_CODEC = "md"; + static final String METADATA_CODEC = "md"; public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); From 519785e244ba08d3d080a4f92afc8783ef9f971a Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Mon, 12 Jun 2023 16:17:50 +0530 Subject: [PATCH 08/10] Empty commit Signed-off-by: Bhumika Saini From 1c3c959cb2bb8996f5e15700ae7bd11158297c00 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Mon, 12 Jun 2023 17:41:19 +0530 Subject: [PATCH 09/10] Empty commit Signed-off-by: Bhumika Saini From 42d8bdad44282ab30e1adc4d336de3053413deec Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Mon, 12 Jun 2023 19:56:04 +0530 Subject: [PATCH 10/10] Fix javadoc Signed-off-by: Bhumika Saini --- .../index/translog/transfer/TranslogTransferManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 0198cb6fef001..489c81f802695 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -220,7 +220,6 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) * * @param metadata The object to be parsed * @return Byte representation for the given metadata - * @throws IOException */ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException { byte[] metadataBytes;