Skip to content

Commit

Permalink
[Remote Cluster State] Parallel and Multipart IndexMetadata uploads (#…
Browse files Browse the repository at this point in the history
…9664)

* [Remote Cluster State] Parallel and Multipart IndexMetadata uploads

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi authored Sep 6, 2023
1 parent 742267d commit 92b2095
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
Expand All @@ -19,11 +21,13 @@
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;

Expand Down Expand Up @@ -244,4 +248,17 @@ public void close() throws IOException {
throw new IOException("Closure of some of the multi-part streams failed.");
}
}

/**
* Compute final checksum for IndexInput container checksum footer added by {@link CodecUtil}
* @param indexInput IndexInput with checksum in footer
* @param checksumBytesLength length of checksum bytes
* @return final computed checksum of entire indexInput
*/
public static long checksumOfChecksum(IndexInput indexInput, int checksumBytesLength) throws IOException {
long storedChecksum = CodecUtil.retrieveChecksum(indexInput);
CRC32 checksumOfChecksum = new CRC32();
checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum));
return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), checksumBytesLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
Expand All @@ -22,6 +24,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -34,11 +38,14 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand All @@ -57,6 +64,8 @@ public class RemoteClusterStateService implements Closeable {

public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";

public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -130,24 +139,11 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro
}
ensureRepositorySet();

final List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = new ArrayList<>();
// todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.add(uploadedIndexMetadata);
}
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
Expand Down Expand Up @@ -197,6 +193,9 @@ public ClusterMetadataManifest writeIncrementalMetadata(
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
.stream()
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));

List<IndexMetadata> toUpload = new ArrayList<>();

for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
Expand All @@ -207,32 +206,22 @@ public ClusterMetadataManifest writeIncrementalMetadata(
indexMetadata.getVersion()
);
numIndicesUpdated++;
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
toUpload.add(indexMetadata);
} else {
numIndicesUnchanged++;
}
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
}

List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload);
uploadedIndexMetadataList.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);

for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
false
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand All @@ -254,6 +243,118 @@ public ClusterMetadataManifest writeIncrementalMetadata(
return manifest;
}

/**
* Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return.
* @param clusterState current ClusterState
* @param toUpload list of IndexMetadata to upload
* @return {@code List<UploadedIndexMetadata>} list of IndexMetadata uploaded to remote
* @throws IOException
*/
private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clusterState, List<IndexMetadata> toUpload)
throws IOException {
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(toUpload.size()));
final CountDownLatch latch = new CountDownLatch(toUpload.size());
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());

LatchedActionListener<UploadedIndexMetadata> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> {
logger.trace(
String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName())
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof IndexMetadataTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
);
exceptionList.add(ex);
}),
latch
);

for (IndexMetadata indexMetadata : toUpload) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
}

try {
if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(""))
)
);
exceptionList.forEach(ex::addSuppressed);
throw ex;
}
} catch (InterruptedException ex) {
exceptionList.forEach(ex::addSuppressed);
IndexMetadataTransferException exception = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(""))
),
ex
);
Thread.currentThread().interrupt();
throw exception;
}
if (exceptionList.size() > 0) {
IndexMetadataTransferException exception = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
"Exception during transfer of IndexMetadata to Remote %s",
toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(""))
)
);
exceptionList.forEach(exception::addSuppressed);
throw exception;
}
return result;
}

/**
* Allows async Upload of IndexMetadata to remote
* @param clusterState current ClusterState
* @param indexMetadata {@link IndexMetadata} to upload
* @param latchedActionListener listener to respond back on after upload finishes
* @throws IOException
*/
private void writeIndexMetadataAsync(
ClusterState clusterState,
IndexMetadata indexMetadata,
LatchedActionListener<UploadedIndexMetadata> latchedActionListener
) throws IOException {
final BlobContainer indexMetadataContainer = indexMetadataContainer(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
indexMetadata.getIndexUUID()
);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata)
)
),
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex))
);

INDEX_METADATA_FORMAT.writeAsync(
indexMetadata,
indexMetadataContainer,
indexMetadataFileName(indexMetadata),
blobStoreRepository.getCompressor(),
completionListener
);
}

@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
Expand Down Expand Up @@ -313,14 +414,6 @@ private ClusterMetadataManifest uploadManifest(
}
}

private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName)
throws IOException {
final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID());
INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
// returning full path
return indexMetadataContainer.path().buildAsString() + fileName;
}

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
Expand Down Expand Up @@ -468,4 +561,18 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste
throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e);
}
}

/**
* Exception for IndexMetadata transfer failures to remote
*/
static class IndexMetadataTransferException extends RuntimeException {

public IndexMetadataTransferException(String errorDesc) {
super(errorDesc);
}

public IndexMetadataTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -29,7 +28,6 @@
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.store.exception.ChecksumCombinationException;

Expand All @@ -47,9 +45,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;
import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;

/**
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
Expand Down Expand Up @@ -401,11 +398,8 @@ private void uploadBlob(

private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException {
try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) {
long storedChecksum = CodecUtil.retrieveChecksum(indexInput);
CRC32 checksumOfChecksum = new CRC32();
checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum));
try {
return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES);
return checksumOfChecksum(indexInput, SEGMENT_CHECKSUM_BYTES);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
Expand Down
Loading

0 comments on commit 92b2095

Please sign in to comment.