Skip to content

Commit

Permalink
Merge branch 'main' into remote-metadata-equals
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Oct 20, 2023
2 parents 0592116 + 5093cc7 commit 91a3643
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

protected void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
restore(randomBoolean(), indices);
}

protected void restore(boolean restoreAllShards, String... indices) {
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ private void resetCluster(int dataNodeCount, int clusterManagerNodeCount) {
internalCluster().startDataOnlyNodes(dataNodeCount);
}

protected void verifyRedIndicesAndTriggerRestore(Map<String, Long> indexStats, String indexName, boolean indexMoreDocs)
throws Exception {
ensureRed(indexName);
restore(false, indexName);
verifyRestoredData(indexStats, indexName, indexMoreDocs);
}

public void testFullClusterRestore() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand All @@ -83,7 +90,7 @@ public void testFullClusterRestore() throws Exception {

// Step - 3 Trigger full cluster restore and validate
validateMetadata(List.of(INDEX_NAME));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

public void testFullClusterRestoreMultipleIndices() throws Exception {
Expand Down Expand Up @@ -112,8 +119,8 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {

// Step - 3 Trigger full cluster restore
validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRestoredData(indexStats2, secondIndexName, false);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);
verifyRedIndicesAndTriggerRestore(indexStats2, secondIndexName, false);
assertTrue(INDEX_READ_ONLY_SETTING.get(clusterService().state().metadata().index(secondIndexName).getSettings()));
assertThrows(ClusterBlockException.class, () -> indexSingleDoc(secondIndexName));
// Test is complete
Expand Down Expand Up @@ -181,7 +188,7 @@ public void testRemoteStateFullRestart() throws Exception {
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, prevClusterUUID) : "Full restart not successful. cluster uuid has changed";
validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

private void validateMetadata(List<String> indexNames) {
Expand Down Expand Up @@ -246,19 +253,18 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
// Test is complete

// Remote the cluster read only block to ensure proper cleanup
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build());
assertFalse(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));

verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
}

private void registerCustomRepository() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,12 @@ public Builder initializeAsRemoteStoreRestore(
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
if (forceRecoverAllPrimaries == false && indexShardRoutingTableMap.containsKey(shardId) == false) {
if (indexShardRoutingTableMap.containsKey(shardId) == false) {
throw new IllegalStateException("IndexShardRoutingTable is not present for shardId: " + shardId);
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingTableMap.get(shardId);
if (forceRecoverAllPrimaries || indexShardRoutingTable == null || indexShardRoutingTable.primaryShard().unassigned()) {
if (forceRecoverAllPrimaries || indexShardRoutingTable.primaryShard().unassigned()) {
// Primary shard to be recovered from remote store.
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
// All the replica shards to be recovered from peer recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;

Expand Down Expand Up @@ -121,21 +120,7 @@ static ClusterState updateRoutingTable(final ClusterState state) {
// initialize all index routing tables as empty
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
for (final IndexMetadata cursor : state.metadata().indices().values()) {
// Whether IndexMetadata is recovered from local disk or remote it doesn't matter to us at this point.
// We are only concerned about index data recovery here. Which is why we only check for remote store enabled and not for remote
// cluster state enabled.
if (cursor.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false
|| state.routingTable().hasIndex(cursor.getIndex()) == false
|| state.routingTable()
.index(cursor.getIndex())
.shardsMatchingPredicateCount(
shardRouting -> shardRouting.primary()
// We need to ensure atleast one of the primaries is being recovered from remote.
// This ensures we have gone through the RemoteStoreRestoreService and routing table is updated
&& shardRouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource
) == 0) {
routingTableBuilder.addAsRecovery(cursor);
}
routingTableBuilder.addAsRecovery(cursor);
}
// start with 0 based versions for routing table
routingTableBuilder.version(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ private RemoteRestoreResult executeRestore(
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
boolean metadataFromRemoteStore = false;
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
IndexMetadata updatedIndexMetadata = indexMetadata;
if (metadataFromRemoteStore == false && restoreAllShards) {
updatedIndexMetadata = IndexMetadata.builder(indexMetadata)
Expand All @@ -204,27 +205,23 @@ private RemoteRestoreResult executeRestore(

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());

Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = new HashMap<>();
if (metadataFromRemoteStore == false) {
indexShardRoutingTableMap = currentState.routingTable()
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
.index(indexName)
.shards()
.values()
.stream()
.collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity()));

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);

rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
}

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(
updatedIndexMetadata,
recoverySource,
indexShardRoutingTableMap,
restoreAllShards || metadataFromRemoteStore
);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(indexName);
Expand All @@ -239,7 +236,10 @@ private RemoteRestoreResult executeRestore(

RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return RemoteRestoreResult.build(restoreUUID, restoreInfo, allocationService.reroute(updatedState, "restored from remote store"));
if (metadataFromRemoteStore == false) {
updatedState = allocationService.reroute(updatedState, "restored from remote store");
}
return RemoteRestoreResult.build(restoreUUID, restoreInfo, updatedState);
}

private void restoreGlobalMetadata(Metadata.Builder mdBuilder, Metadata remoteMetadata) {
Expand Down
Loading

0 comments on commit 91a3643

Please sign in to comment.