Skip to content

Commit

Permalink
integrate restore changes with state upload and download
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Aug 16, 2023
1 parent 35a4f62 commit c1cb834
Show file tree
Hide file tree
Showing 19 changed files with 144 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -29,6 +28,7 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand All @@ -47,8 +47,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME))
// TODO uncomment after rebased with upload changes
// .put(CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME)
.put(CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME)
.build();
}

Expand Down Expand Up @@ -470,7 +469,6 @@ public void testRTSRestoreNoData() throws IOException {

// TODO: Restore flow - index aliases

@AwaitsFix(bugUrl = "waiting upload flow rebase. tested on integration PR")
public void testRestoreFlowFullClusterRestartZeroReplica() {
int shardCount = 1;
// Step - 1 index some data to generate files in remote directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;

public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy, PersistedState remotePersistedState) {
public CoordinationState(
DiscoveryNode localNode,
PersistedState persistedState,
ElectionStrategy electionStrategy,
PersistedState remotePersistedState
) {
this.localNode = localNode;

// persisted state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,9 @@ boolean publicationInProgress() {
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get()));
coordinationState.set(
new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get())
);
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ public String getStateUUID() {
return stateUUID;
}

public ClusterMetadataMarker(Map<String, UploadedIndexMetadata> indices, long term, long version, String clusterUUID, String stateUUID) {
public ClusterMetadataMarker(
Map<String, UploadedIndexMetadata> indices,
long term,
long version,
String clusterUUID,
String stateUUID
) {
this.indices = Collections.unmodifiableMap(indices);
this.term = term;
this.version = version;
Expand Down Expand Up @@ -133,7 +139,10 @@ public boolean equals(Object o) {
return false;
}
final ClusterMetadataMarker that = (ClusterMetadataMarker) o;
return Objects.equals(indices, that.indices) && term == that.term && version == that.version && Objects.equals(clusterUUID, that.clusterUUID)
return Objects.equals(indices, that.indices)
&& term == that.term
&& version == that.version
&& Objects.equals(clusterUUID, that.clusterUUID)
&& Objects.equals(stateUUID, that.stateUUID);
}

Expand Down Expand Up @@ -164,7 +173,6 @@ public static class Builder {
private String clusterUUID;
private String stateUUID;


public void term(long term) {
this.term = term;
}
Expand Down Expand Up @@ -218,8 +226,10 @@ private static String uploadedFilename(Object[] fields) {
return (String) fields[2];
}

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>("uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)));
private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>(
"uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD);
Expand Down Expand Up @@ -272,8 +282,9 @@ public boolean equals(Object o) {
return false;
}
final UploadedIndexMetadata that = (UploadedIndexMetadata) o;
return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) && Objects.equals(uploadedFilename,
that.uploadedFilename);
return Objects.equals(indexName, that.indexName)
&& Objects.equals(indexUUID, that.indexUUID)
&& Objects.equals(uploadedFilename, that.uploadedFilename);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -71,14 +69,21 @@ public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState cl
}

final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>();
//todo parallel upload
// todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
}
return uploadMarker(clusterState, allUploadedIndexMetadata);
Expand Down Expand Up @@ -112,12 +117,18 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS

int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices());
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(
previousMarker.getIndices()
);
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion,
indexMetadata.getVersion());
logger.trace(
"updating metadata for [{}], changing version from [{}] to [{}]",
indexMetadata.getIndex(),
previousVersion,
indexMetadata.getVersion()
);
numIndicesUpdated++;
final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
Expand All @@ -137,7 +148,7 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS
}

public ClusterState getLatestClusterState(String clusterUUID) {
//todo
// todo
return null;
}

Expand All @@ -147,7 +158,8 @@ public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String,
final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
final ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
clusterState.stateUUID()
);
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
return marker;
}
Expand All @@ -166,13 +178,15 @@ public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterM
}

public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID));
.blobContainer(
blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID)
);
}

public BlobContainer markerContainer(String clusterName, String clusterUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker"));
}
Expand Down Expand Up @@ -200,8 +214,6 @@ public ClusterMetadataMarker getLatestClusterMetadataMarker(String clusterUUID,

public ClusterMetadataMarker fetchRemoteClusterMetadataMarker(String filename, String clusterUUID, String clusterState) {
try {
// We would be creating a new repository for RemoteClusterState which would hold this logic. We would also not not need to have
// getNamedXContentRegistry method in blobStoreRepository
return RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.read(
getMarkerBlobContainer(clusterUUID, clusterState),
filename,
Expand Down Expand Up @@ -232,9 +244,12 @@ public String getLatestMarkerFileName(String clusterUUID, String clusterState) {
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterUUID, String clusterName) throws IOException {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataMarker clusterMetadataMarker = getLatestClusterMetadataMarker(clusterUUID, clusterName);
for (Map.Entry<String, UploadedIndexMetadata> entry: clusterMetadataMarker.getIndices().entrySet()) {
IndexMetadata indexMetadata = IndexMetadata.readFrom(new InputStreamStreamInput(
getMarkerBlobContainer(clusterUUID, clusterName).readBlob(entry.getValue().getUploadedFilename())));
for (Map.Entry<String, UploadedIndexMetadata> entry : clusterMetadataMarker.getIndices().entrySet()) {
IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.read(
blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath()),
entry.getValue().getUploadedFilename(),
blobStoreRepository.getNamedXContentRegistry()
);
remoteIndexMetadata.put(entry.getKey(), indexMetadata);
}
return remoteIndexMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
package org.opensearch.common.blobstore;

import org.opensearch.core.action.ActionListener;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.LatchedActionListener;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -44,8 +42,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/**
* An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes.
Expand Down Expand Up @@ -238,11 +234,8 @@ default void listBlobsByPrefixInSortedOrder(
}
}

default List<BlobMetadata> listBlobsByPrefixInSortedOrder(
String blobNamePrefix,
int limit,
BlobNameSortOrder blobNameSortOrder
) throws IOException {
default List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public DiscoveryModule(
Path configFile,
GatewayMetaState gatewayMetaState,
RerouteService rerouteService,
NodeHealthService nodeHealthService,
RemoteClusterStateService remoteClusterStateService
NodeHealthService nodeHealthService
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down
18 changes: 13 additions & 5 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -621,15 +620,19 @@ public void close() throws IOException {
*/
public static class RemotePersistedState implements PersistedState {

//todo check diff between currentTerm and clusterState term
// todo check diff between currentTerm and clusterState term
private long currentTerm;
private ClusterState lastAcceptedState;
private ClusterMetadataMarker lastAcceptedMarker;
private final RemoteClusterStateService remoteClusterStateService;
//todo Is this needed?
// todo Is this needed?
private boolean writeNextStateFully;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService, final long currentTerm, final ClusterState lastAcceptedState) {
public RemotePersistedState(
final RemoteClusterStateService remoteClusterStateService,
final long currentTerm,
final ClusterState lastAcceptedState
) {
this.remoteClusterStateService = remoteClusterStateService;
this.currentTerm = currentTerm;
this.lastAcceptedState = lastAcceptedState;
Expand Down Expand Up @@ -663,7 +666,12 @@ public void setLastAcceptedState(ClusterState clusterState) {
if (shouldWriteFullClusterState(clusterState)) {
marker = remoteClusterStateService.writeFullMetadata(currentTerm, clusterState);
} else {
marker = remoteClusterStateService.writeIncrementalMetadata(currentTerm, lastAcceptedState, clusterState, lastAcceptedMarker);
marker = remoteClusterStateService.writeIncrementalMetadata(
currentTerm,
lastAcceptedState,
clusterState,
lastAcceptedMarker
);
}
lastAcceptedState = clusterState;
lastAcceptedMarker = marker;
Expand Down
Loading

0 comments on commit c1cb834

Please sign in to comment.