Skip to content

Commit

Permalink
Address PR comments and add basic IT
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Oct 18, 2023
1 parent 5ff0521 commit 568bbc5
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void restore(String... indices) {
);
}

protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName, boolean indexMoreData) throws Exception {
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
// This is to ensure that shards that were already assigned will get latest count
Expand All @@ -68,6 +68,8 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
30,
TimeUnit.SECONDS
);
if (indexMoreData == false) return;

IndexResponse response = indexSingleDoc(indexName);
if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) {
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
Expand All @@ -80,6 +82,10 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
);
}

protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,31 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.util.Locale;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {
Expand Down Expand Up @@ -124,6 +135,7 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
Map<String, Long> indexStats2 = indexData(1, false, secondIndexName);
assertEquals((shardCount + 1) * (replicaCount + 1), getNumShards(secondIndexName).totalNumShards);
ensureGreen(secondIndexName);
updateIndexBlock(true, secondIndexName);

String prevClusterUUID = clusterService().state().metadata().clusterUUID();

Expand All @@ -134,9 +146,15 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore
restoreAndValidate(prevClusterUUID, indexStats);
ensureGreen(secondIndexName);
verifyRestoredData(indexStats2, secondIndexName);
// validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRestoredData(indexStats2, secondIndexName, false);
assertTrue(INDEX_READ_ONLY_SETTING.get(clusterService().state().metadata().index(secondIndexName).getSettings()));
assertThrows(ClusterBlockException.class, () -> indexSingleDoc(secondIndexName));
// Test is complete

// Remove the block to ensure proper cleanup
updateIndexBlock(false, secondIndexName);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9834")
Expand Down Expand Up @@ -285,4 +303,117 @@ private void resetShardLimits() {
}
}

public void testFullClusterRestoreGlobalMetadata() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

// Step - 1 index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();

// Create global metadata - register a custom repo
// TODO - uncomment after customs is also uploaded correctly
// registerCustomRepository();

// Create global metadata - persistent settings
updatePersistentSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 34).build());

// Create global metadata - index template
putIndexTemplate();

// Create global metadata - Put cluster block
addClusterLevelReadOnlyBlock();

// Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata
resetCluster(dataNodeCount, clusterManagerNodeCount);

String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
// Test is complete

// Remote the cluster read only block to ensure proper cleanup
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build());
assertFalse(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
}

private void registerCustomRepository() {
assertAcked(
client().admin()
.cluster()
.preparePutRepository("custom-repo")
.setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false))
.get()
);
}

private void verifyRestoredRepositories() {
RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(2, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings()));
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings()));
// assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
// assertEquals(Settings.builder().put("location", randomRepoPath()).put("compress", false).build(),
// repositoriesMetadata.repository("custom-repo").settings());
}

private void addClusterLevelReadOnlyBlock() throws InterruptedException, ExecutionException {
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), true).build());
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
}

private void updatePersistentSettings(Settings settings) throws ExecutionException, InterruptedException {
ClusterUpdateSettingsRequest resetRequest = new ClusterUpdateSettingsRequest();
resetRequest.persistentSettings(settings);
assertAcked(client().admin().cluster().updateSettings(resetRequest).get());
}

private void verifyRestoredIndexTemplate() {
Map<String, IndexTemplateMetadata> indexTemplateMetadataMap = clusterService().state().metadata().templates();
assertEquals(1, indexTemplateMetadataMap.size());
assertEquals(Arrays.asList("pattern-1", "log-*"), indexTemplateMetadataMap.get("my-template").patterns());
assertEquals(
Settings.builder() // <1>
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.build(),
indexTemplateMetadataMap.get("my-template").settings()
);
}

private static void putIndexTemplate() {
PutIndexTemplateRequest request = new PutIndexTemplateRequest("my-template"); // <1>
request.patterns(Arrays.asList("pattern-1", "log-*")); // <2>

request.settings(
Settings.builder() // <1>
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
);
assertTrue(client().admin().indices().putTemplate(request).actionGet().isAcknowledged());
}

private static void updateIndexBlock(boolean value, String secondIndexName) throws InterruptedException, ExecutionException {
assertAcked(
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(Settings.builder().put(INDEX_READ_ONLY_SETTING.getKey(), value).build(), secondIndexName)
)
.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,14 @@ public RemoteRestoreResult restore(
String[] indexNames
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
Metadata remoteGlobalMetadata = null;
Metadata remoteMetadata = null;
boolean metadataFromRemoteStore = (restoreClusterUUID == null
|| restoreClusterUUID.isEmpty()
|| restoreClusterUUID.isBlank()) == false;
if (metadataFromRemoteStore) {
try {
remoteGlobalMetadata = remoteClusterStateService.getLatestMetadata(
currentState.getClusterName().value(),
restoreClusterUUID
);
remoteGlobalMetadata.getIndices().values().forEach(indexMetadata -> {
remoteMetadata = remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID);
remoteMetadata.getIndices().values().forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
});
} catch (Exception e) {
Expand All @@ -167,7 +164,7 @@ public RemoteRestoreResult restore(
}
}
validate(currentState, indexMetadataMap, restoreClusterUUID, restoreAllShards);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteGlobalMetadata);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
}

/**
Expand All @@ -181,7 +178,7 @@ private RemoteRestoreResult executeRestore(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards,
Metadata remoteGlobalMetadata
Metadata remoteMetadata
) {
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
Expand Down Expand Up @@ -234,7 +231,7 @@ private RemoteRestoreResult executeRestore(
totalShards += updatedIndexMetadata.getNumberOfShards();
}

restoreGlobalMetadata(mdBuilder, remoteGlobalMetadata);
restoreGlobalMetadata(mdBuilder, remoteMetadata);

RestoreInfo restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards);

Expand All @@ -243,25 +240,25 @@ private RemoteRestoreResult executeRestore(
return RemoteRestoreResult.build(restoreUUID, restoreInfo, allocationService.reroute(updatedState, "restored from remote store"));
}

private void restoreGlobalMetadata(Metadata.Builder mdBuilder, Metadata remoteGlobalMetadata) {
if (remoteGlobalMetadata.persistentSettings() != null) {
Settings settings = remoteGlobalMetadata.persistentSettings();
private void restoreGlobalMetadata(Metadata.Builder mdBuilder, Metadata remoteMetadata) {
if (remoteMetadata.persistentSettings() != null) {
Settings settings = remoteMetadata.persistentSettings();
clusterService.getClusterSettings().validateUpdate(settings);
mdBuilder.persistentSettings(settings);
}
if (remoteGlobalMetadata.templates() != null) {
for (final IndexTemplateMetadata cursor : remoteGlobalMetadata.templates().values()) {
if (remoteMetadata.templates() != null) {
for (final IndexTemplateMetadata cursor : remoteMetadata.templates().values()) {
mdBuilder.put(cursor);
}
}
if (remoteGlobalMetadata.customs() != null) {
for (final Map.Entry<String, Metadata.Custom> cursor : remoteGlobalMetadata.customs().entrySet()) {
if (remoteMetadata.customs() != null) {
for (final Map.Entry<String, Metadata.Custom> cursor : remoteMetadata.customs().entrySet()) {
if (RepositoriesMetadata.TYPE.equals(cursor.getKey()) == false) {
mdBuilder.putCustom(cursor.getKey(), cursor.getValue());
}
}
}
Optional<RepositoriesMetadata> repositoriesMetadata = Optional.ofNullable(remoteGlobalMetadata.custom(RepositoriesMetadata.TYPE));
Optional<RepositoriesMetadata> repositoriesMetadata = Optional.ofNullable(remoteMetadata.custom(RepositoriesMetadata.TYPE));
repositoriesMetadata = repositoriesMetadata.map(
repositoriesMetadata1 -> new RepositoriesMetadata(
repositoriesMetadata1.repositories()
Expand Down

0 comments on commit 568bbc5

Please sign in to comment.