Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Translog metadata upload/download to write/read header and footer via VersionedCodecStreamWrapper #7953

Merged
merged 10 commits into from
Jun 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -61,6 +65,12 @@ public class TranslogTransferManager {
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

private static final VersionedCodecStreamWrapper<TranslogTransferMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
new TranslogTransferMetadataHandler(),
TranslogTransferMetadata.CURRENT_VERSION,
TranslogTransferMetadata.METADATA_CODEC
);

public TranslogTransferManager(
ShardId shardId,
TransferService transferService,
Expand Down Expand Up @@ -174,9 +184,9 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
return metadataStreamWrapper.readStream(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
Expand All @@ -197,13 +207,40 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));

return new TransferFileSnapshot(
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.createMetadataBytes(),
getMetadataBytes(translogTransferMetadata),
translogTransferMetadata.getPrimaryTerm()
);
}

/**
* Get the metadata bytes for a {@link TranslogTransferMetadata} object
*
* @param metadata The object to be parsed
* @return Byte representation for the given metadata
*/
public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException {
byte[] metadataBytes;
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved

try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + metadata.getPrimaryTerm(),
getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()),
output,
TranslogTransferMetadata.BUFFER_SIZE
)
) {
metadataStreamWrapper.writeStream(indexOutput, metadata);
}
metadataBytes = BytesReference.toBytes(output.bytes());
}

return metadataBytes;
}

/**
* This method handles deletion of multiple generations for a single primary term. The deletion happens for translog
* and metadata files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,10 @@

package org.opensearch.index.translog.transfer;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.common.SetOnce;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -44,11 +36,11 @@ public class TranslogTransferMetadata {

public static final String METADATA_SEPARATOR = "__";

private static final int BUFFER_SIZE = 4096;
static final int BUFFER_SIZE = 4096;

private static final int CURRENT_VERSION = 1;
static final int CURRENT_VERSION = 1;

private static final String METADATA_CODEC = "md";
static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();

Expand All @@ -59,15 +51,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans
this.count = count;
}

public TranslogTransferMetadata(IndexInput indexInput) throws IOException {
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION);
this.primaryTerm = indexInput.readLong();
this.generation = indexInput.readLong();
this.minTranslogGeneration = indexInput.readLong();
this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings());
}

public long getPrimaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -96,24 +79,6 @@ public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
}

public byte[] createMetadataBytes() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + primaryTerm,
getFileName(primaryTerm, generation),
output,
BUFFER_SIZE
)
) {
CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION);
write(indexOutput);
CodecUtil.writeFooter(indexOutput);
}
return BytesReference.toBytes(output.bytes());
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand All @@ -127,17 +92,6 @@ public boolean equals(Object o) {
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}

private void write(DataOutput out) throws IOException {
out.writeLong(primaryTerm);
out.writeLong(generation);
out.writeLong(minTranslogGeneration);
if (generationToPrimaryTermMapper.get() != null) {
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
} else {
out.writeMapOfStrings(new HashMap<>());
}
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.io.IndexIOStreamHandler;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Handler for {@link TranslogTransferMetadata}
*
* @opensearch.internal
*/
public class TranslogTransferMetadataHandler implements IndexIOStreamHandler<TranslogTransferMetadata> {

/**
* Implements logic to read content from file input stream {@code indexInput} and parse into {@link TranslogTransferMetadata}
*
* @param indexInput file input stream
* @return content parsed to {@link TranslogTransferMetadata}
*/
@Override
public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOException {
long primaryTerm = indexInput.readLong();
long generation = indexInput.readLong();
long minTranslogGeneration = indexInput.readLong();
Map<String, String> generationToPrimaryTermMapper = indexInput.readMapOfStrings();

int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

return metadata;
}

/**
* Implements logic to write content from {@code content} to file output stream {@code indexOutput}
*
* @param indexOutput file input stream
* @param content metadata content to be written
*/
@Override
public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata content) throws IOException {
indexOutput.writeLong(content.getPrimaryTerm());
indexOutput.writeLong(content.getGeneration());
indexOutput.writeLong(content.getMinTranslogGeneration());
if (content.getGenerationToPrimaryTermMapper() != null) {
indexOutput.writeMapOfStrings(content.getGenerationToPrimaryTermMapper());
} else {
indexOutput.writeMapOfStrings(new HashMap<>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testReadMetadataSingleFile() throws IOException {

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn(
new ByteArrayInputStream(metadata.createMetadataBytes())
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);

assertEquals(metadata, translogTransferManager.readMetadata());
Expand All @@ -222,7 +222,7 @@ public void testReadMetadataMultipleFiles() throws IOException {

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn(
new ByteArrayInputStream(metadata.createMetadataBytes())
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);

assertEquals(metadata, translogTransferManager.readMetadata());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.junit.Before;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class TranslogTransferMetadataHandlerTests extends OpenSearchTestCase {
private TranslogTransferMetadataHandler handler;

@Before
public void setUp() throws Exception {
super.setUp();
handler = new TranslogTransferMetadataHandler();
}

public void testReadContent() throws IOException {
TranslogTransferMetadata expectedMetadata = getTestMetadata();

// Operation: Read expected metadata from source input stream.
IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes());
TranslogTransferMetadata actualMetadata = handler.readContent(indexInput);

// Verification: Compare actual metadata read from the source input stream.
assertEquals(expectedMetadata, actualMetadata);
}

public void testWriteContent() throws IOException {
TranslogTransferMetadata expectedMetadata = getTestMetadata();

// Operation: Write expected metadata to the target output stream.
BytesStreamOutput output = new BytesStreamOutput();
OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096);
handler.writeContent(actualMetadataStream, expectedMetadata);
actualMetadataStream.close();

// Verification: Compare actual metadata written to the target output stream.
IndexInput indexInput = new ByteArrayIndexInput("metadata file", BytesReference.toBytes(output.bytes()));
long primaryTerm = indexInput.readLong();
long generation = indexInput.readLong();
long minTranslogGeneration = indexInput.readLong();
Map<String, String> generationToPrimaryTermMapper = indexInput.readMapOfStrings();
int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
assertEquals(expectedMetadata, actualMetadata);
}

private TranslogTransferMetadata getTestMetadata() {
long primaryTerm = 3;
long generation = 500;
long minTranslogGeneration = 300;
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
generationToPrimaryTermMapper.put("300", "1");
generationToPrimaryTermMapper.put("400", "2");
generationToPrimaryTermMapper.put("500", "3");
int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

return metadata;
}

private byte[] getTestMetadataBytes() throws IOException {
TranslogTransferMetadata metadata = getTestMetadata();

BytesStreamOutput output = new BytesStreamOutput();
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096);
indexOutput.writeLong(metadata.getPrimaryTerm());
indexOutput.writeLong(metadata.getGeneration());
indexOutput.writeLong(metadata.getMinTranslogGeneration());
Map<String, String> generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper();
indexOutput.writeMapOfStrings(generationToPrimaryTermMapper);
indexOutput.close();

return BytesReference.toBytes(output.bytes());
}
}