diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 6f468f25ee5f1..611dfc2756b29 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; @@ -199,4 +200,25 @@ public void setRefreshFrequency(int refreshFrequency) { this.refreshFrequency = refreshFrequency; } } + + public void excludeNodeSet(String attr, String value) { + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._" + attr, value)) + .get() + ); + } + + public void stopShardRebalancing() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build()) + .get() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index 24a332212be6a..5094a7cf29c6a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -30,6 +30,7 @@ import org.opensearch.test.transport.MockTransportService; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -132,8 +133,8 @@ public void testRemotePrimaryDocRepReplica() throws Exception { /* Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica + - Starts 2 docrep backed data node + - Creates an index with 1 replica - Starts 1 remote backed data node - Index some docs - Move primary copy from docrep to remote through _cluster/reroute @@ -145,14 +146,14 @@ public void testRemotePrimaryDocRepReplica() throws Exception { public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { internalCluster().startClusterManagerOnlyNode(); - logger.info("---> Starting 1 docrep data nodes"); - String docrepNodeName = internalCluster().startDataOnlyNode(); + logger.info("---> Starting 2 docrep data nodes"); + internalCluster().startDataOnlyNodes(2); internalCluster().validateClusterFormed(); assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); - logger.info("---> Creating index with 0 replica"); + logger.info("---> Creating index with 1 replica"); Settings zeroReplicas = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") .build(); @@ -245,14 +246,26 @@ RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush requ pollAndCheckRetentionLeases(REMOTE_PRI_DOCREP_REMOTE_REP); } + /* + Scenario: + - Starts 2 docrep backed data node + - Creates an index with 1 replica + - Starts 1 remote backed data node + - Index some docs + - Move primary copy from docrep to remote through _cluster/reroute + - Starts another remote backed data node + - Expands index to 2 replicas. One replica copy lies in remote backed node and other in docrep backed node + - Index some more docs + - Assert retention lease consistency + */ public void testMissingRetentionLeaseCreatedOnFailedOverRemoteReplica() throws Exception { internalCluster().startClusterManagerOnlyNode(); - logger.info("---> Starting docrep data node"); - internalCluster().startDataOnlyNode(); + logger.info("---> Starting 2 docrep data nodes"); + internalCluster().startDataOnlyNodes(2); Settings zeroReplicasAndOverridenSyncIntervals = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") .build(); @@ -323,11 +336,10 @@ private void pollAndCheckRetentionLeases(String indexName) throws Exception { /* Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica + - Starts 2 docrep backed data node + - Creates an index with 1 replica - Starts 1 remote backed data node - Move primary copy from docrep to remote through _cluster/reroute - - Expands index to 1 replica - Stops remote enabled node - Ensure doc count is same after failover - Index some more docs to ensure working of failed-over primary @@ -335,13 +347,13 @@ private void pollAndCheckRetentionLeases(String indexName) throws Exception { public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { internalCluster().startClusterManagerOnlyNode(); - logger.info("---> Starting 1 docrep data nodes"); - String docrepNodeName = internalCluster().startDataOnlyNode(); + logger.info("---> Starting 2 docrep data nodes"); + internalCluster().startDataOnlyNodes(2); internalCluster().validateClusterFormed(); assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); logger.info("---> Creating index with 0 replica"); - Settings excludeRemoteNode = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + Settings excludeRemoteNode = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); createIndex(FAILOVER_REMOTE_TO_DOCREP, excludeRemoteNode); ensureGreen(FAILOVER_REMOTE_TO_DOCREP); initDocRepToRemoteMigration(); @@ -376,8 +388,8 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { ); ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - logger.info("---> Expanding index to 1 replica copy"); - Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); + logger.info("---> Expanding index to 2 replica copies"); + Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build(); assertAcked( internalCluster().client() .admin() @@ -412,7 +424,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { logger.info("---> Stop remote store enabled node"); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName)); - ensureStableCluster(2); + ensureStableCluster(3); ensureYellow(FAILOVER_REMOTE_TO_DOCREP); shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); @@ -433,7 +445,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP); shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); - assertEquals(1, shardStatsMap.size()); + assertEquals(2, shardStatsMap.size()); shardStatsMap.forEach( (shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); } ); @@ -441,8 +453,8 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { /* Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica + - Starts 2 docrep backed data nodes + - Creates an index with 1 replica - Starts 1 remote backed data node - Moves primary copy from docrep to remote through _cluster/reroute - Starts 1 more remote backed data node @@ -455,13 +467,13 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { internalCluster().startClusterManagerOnlyNode(); - logger.info("---> Starting 1 docrep data node"); - String docrepNodeName = internalCluster().startDataOnlyNode(); + logger.info("---> Starting 2 docrep data nodes"); + List docrepNodeNames = internalCluster().startDataOnlyNodes(2); internalCluster().validateClusterFormed(); assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); - logger.info("---> Creating index with 0 replica"); - createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + logger.info("---> Creating index with 1 replica"); + createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureGreen(FAILOVER_REMOTE_TO_REMOTE); initDocRepToRemoteMigration(); @@ -484,15 +496,17 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE); asyncIndexingService.startIndexing(); - logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1); + String primaryNodeName = primaryNodeName(FAILOVER_REMOTE_TO_REMOTE); + logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", primaryNodeName, remoteNodeName1); assertAcked( internalCluster().client() .admin() .cluster() .prepareReroute() - .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1)) + .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, primaryNodeName, remoteNodeName1)) .get() ); + waitForRelocation(); ensureGreen(FAILOVER_REMOTE_TO_REMOTE); assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1); @@ -507,7 +521,13 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { .indices() .prepareUpdateSettings() .setIndices(FAILOVER_REMOTE_TO_REMOTE) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build()) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + // prevent replica copy from being allocated to the extra docrep node + .put("index.routing.allocation.exclude._name", primaryNodeName) + .build() + ) .get() ); ensureGreen(FAILOVER_REMOTE_TO_REMOTE); @@ -536,8 +556,8 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { logger.info("---> Stop remote store enabled node hosting the primary"); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1)); - ensureStableCluster(3); - ensureYellow(FAILOVER_REMOTE_TO_REMOTE); + ensureStableCluster(4); + ensureYellowAndNoInitializingShards(FAILOVER_REMOTE_TO_REMOTE); DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); waitUntil(() -> { @@ -580,7 +600,6 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { - Creates an index with 0 replica - Starts 1 remote backed data node - Move primary copy from docrep to remote through _cluster/reroute - - Expands index to 1 replica - Stops remote enabled node - Ensure doc count is same after failover - Index some more docs to ensure working of failed-over primary @@ -664,7 +683,8 @@ private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); - } else { + } + if (shardRouting.unassigned() == false && shardRouting.primary() == false) { boolean remoteNode = nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode(); assertEquals( "Mismatched doc count. Is this on remote node ? " + remoteNode, diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java new file mode 100644 index 0000000000000..45679598dc551 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -0,0 +1,516 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteMigrationIndexMetadataUpdateIT extends MigrationBaseTestCase { + /** + * Scenario: + * Performs a blue/green type migration from docrep to remote enabled cluster. + * Asserts that remote based index settings are applied after all shards move over + */ + public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughAllocationExclude() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + addRemote = false; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Creates an index with 1 primary and 1 replica"); + String indexName = "migration-index-allocation-exclude"; + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + logger.info("---> Asserts index still has docrep index settings"); + createIndexAndAssertDocrepProperties(indexName, oneReplica); + + logger.info("---> Start indexing in parallel thread"); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService(indexName); + asyncIndexingService.startIndexing(); + initDocRepToRemoteMigration(); + + logger.info("---> Adding 2 remote enabled nodes to the cluster"); + addRemote = true; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Excluding docrep nodes from allocation"); + excludeNodeSet("type", "docrep"); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Stop indexing and assert remote enabled index settings have been applied"); + asyncIndexingService.stopIndexing(); + assertRemoteProperties(indexName); + } + + /** + * Scenario: + * Performs a manual _cluster/reroute to move shards from docrep to remote enabled nodes. + * Asserts that remote based index settings are only applied for indices whose shards + * have completely moved over to remote enabled nodes + */ + public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughManualReroute() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + List docrepNodeNames = internalCluster().startDataOnlyNodes(2); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating 2 indices with 1 primary and 1 replica"); + String indexName1 = "migration-index-manual-reroute-1"; + String indexName2 = "migration-index-manual-reroute-2"; + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + createIndexAndAssertDocrepProperties(indexName1, oneReplica); + createIndexAndAssertDocrepProperties(indexName2, oneReplica); + + logger.info("---> Starting parallel indexing on both indices"); + AsyncIndexingService indexOne = new AsyncIndexingService(indexName1); + indexOne.startIndexing(); + + AsyncIndexingService indexTwo = new AsyncIndexingService(indexName2); + indexTwo.startIndexing(); + + logger.info( + "---> Stopping shard rebalancing to ensure shards do not automatically move over to newer nodes after they are launched" + ); + stopShardRebalancing(); + + logger.info("---> Starting 2 remote store enabled nodes"); + initDocRepToRemoteMigration(); + addRemote = true; + List remoteNodeNames = internalCluster().startDataOnlyNodes(2); + internalCluster().validateClusterFormed(); + + String primaryNode = primaryNodeName(indexName1); + String replicaNode = docrepNodeNames.stream() + .filter(nodeName -> nodeName.equals(primaryNodeName(indexName1)) == false) + .collect(Collectors.toList()) + .get(0); + + logger.info("---> Moving over both shard copies for the first index to remote enabled nodes"); + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName1, 0, primaryNode, remoteNodeNames.get(0))) + .execute() + .actionGet() + ); + waitForRelocation(); + + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName1, 0, replicaNode, remoteNodeNames.get(1))) + .execute() + .actionGet() + ); + waitForRelocation(); + + logger.info("---> Moving only primary for the second index to remote enabled nodes"); + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName2, 0, primaryNodeName(indexName2), remoteNodeNames.get(0))) + .execute() + .actionGet() + ); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Stopping indexing"); + indexOne.stopIndexing(); + indexTwo.stopIndexing(); + + logger.info("---> Assert remote settings are applied for index one but not for index two"); + assertRemoteProperties(indexName1); + assertDocrepProperties(indexName2); + } + + /** + * Scenario: + * Creates a mixed mode cluster. One index gets created before remote nodes are introduced, + * while the other one is created after remote nodes are added. + *

+ * For the first index, asserts docrep settings at first, excludes docrep nodes from + * allocation and asserts that remote index settings are applied after all shards + * have been relocated. + *

+ * For the second index, asserts that it already has remote enabled settings. + * Indexes some more docs and asserts that the index metadata version does not increment + */ + public void testIndexSettingsUpdatedOnlyForMigratingIndex() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + addRemote = false; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating the first index with 1 primary and 1 replica"); + String indexName = "migration-index"; + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + createIndexAndAssertDocrepProperties(indexName, oneReplica); + + logger.info("---> Starting indexing in parallel"); + AsyncIndexingService indexingService = new AsyncIndexingService(indexName); + indexingService.startIndexing(); + + logger.info("---> Storing current index metadata version"); + long initalMetadataVersion = internalCluster().client() + .admin() + .cluster() + .prepareState() + .get() + .getState() + .metadata() + .index(indexName) + .getVersion(); + + logger.info("---> Adding 2 remote enabled nodes to the cluster"); + initDocRepToRemoteMigration(); + addRemote = true; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Excluding docrep nodes from allocation"); + excludeNodeSet("type", "docrep"); + + waitForRelocation(); + waitNoPendingTasksOnAll(); + indexingService.stopIndexing(); + + logger.info("---> Assert remote settings are applied"); + assertRemoteProperties(indexName); + assertTrue( + initalMetadataVersion < internalCluster().client() + .admin() + .cluster() + .prepareState() + .get() + .getState() + .metadata() + .index(indexName) + .getVersion() + ); + + logger.info("---> Creating a new index on remote enabled nodes"); + String secondIndex = "remote-index"; + createIndex( + secondIndex, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); + indexBulk(secondIndex, 100); + initalMetadataVersion = internalCluster().client() + .admin() + .cluster() + .prepareState() + .get() + .getState() + .metadata() + .index(secondIndex) + .getVersion(); + refresh(secondIndex); + ensureGreen(secondIndex); + + waitNoPendingTasksOnAll(); + + assertRemoteProperties(secondIndex); + + logger.info("---> Assert metadata version is not changed"); + assertEquals( + initalMetadataVersion, + internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(secondIndex).getVersion() + ); + } + + /** + * Scenario: + * Creates an index with 1 primary, 2 replicas on 2 docrep nodes. Since the replica + * configuration is incorrect, the index stays YELLOW. + * Starts 2 more remote nodes and initiates shard relocation through allocation exclusion. + * After shard relocation completes, shuts down the docrep nodes and asserts remote + * index settings are applied even when the index is in YELLOW state + */ + public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + addRemote = false; + List docrepNodes = internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating index with 1 primary and 2 replicas"); + String indexName = "migration-index-allocation-exclude"; + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + createIndexAssertHealthAndDocrepProperties(indexName, oneReplica, this::ensureYellowAndNoInitializingShards); + + logger.info("---> Starting indexing in parallel"); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService(indexName); + asyncIndexingService.startIndexing(); + + logger.info("---> Starts 2 remote enabled nodes"); + initDocRepToRemoteMigration(); + addRemote = true; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Excluding docrep nodes from allocation"); + excludeNodeSet("type", "docrep"); + waitForRelocation(); + waitNoPendingTasksOnAll(); + asyncIndexingService.stopIndexing(); + + logger.info("---> Assert cluster has turned green since more nodes are added to the cluster"); + ensureGreen(indexName); + + logger.info("---> Assert index still has dcorep settings since replica copies are still on docrep nodes"); + assertDocrepProperties(indexName); + + logger.info("---> Stopping docrep nodes"); + for (String node : docrepNodes) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node)); + } + waitNoPendingTasksOnAll(); + ensureYellowAndNoInitializingShards(indexName); + + logger.info("---> Assert remote settings are applied"); + assertRemoteProperties(indexName); + } + + /** + * Scenario: + * Creates an index with 1 primary, 2 replicas on 2 docrep nodes. + * Starts 2 more remote nodes and initiates shard relocation through allocation exclusion. + * After shard relocation completes, restarts the docrep node holding extra replica shard copy + * and asserts remote index settings are applied as soon as the docrep replica copy is unassigned + */ + public void testIndexSettingsUpdatedWhenDocrepNodeIsRestarted() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + addRemote = false; + List docrepNodes = internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating index with 1 primary and 2 replicas"); + String indexName = "migration-index-allocation-exclude"; + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + createIndexAssertHealthAndDocrepProperties(indexName, oneReplica, this::ensureYellowAndNoInitializingShards); + + logger.info("---> Starting indexing in parallel"); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService(indexName); + asyncIndexingService.startIndexing(); + + logger.info("---> Starts 2 remote enabled nodes"); + initDocRepToRemoteMigration(); + addRemote = true; + internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Excluding docrep nodes from allocation"); + excludeNodeSet("type", "docrep"); + waitForRelocation(); + waitNoPendingTasksOnAll(); + asyncIndexingService.stopIndexing(); + + logger.info("---> Assert cluster has turned green since more nodes are added to the cluster"); + ensureGreen(indexName); + + logger.info("---> Assert index still has dcorep settings since replica copies are still on docrep nodes"); + assertDocrepProperties(indexName); + + ClusterState clusterState = internalCluster().client().admin().cluster().prepareState().get().getState(); + DiscoveryNodes nodes = clusterState.nodes(); + + String docrepReplicaNodeName = ""; + for (ShardRouting shardRouting : clusterState.routingTable().index(indexName).shard(0).getShards()) { + if (nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() == false) { + docrepReplicaNodeName = nodes.get(shardRouting.currentNodeId()).getName(); + break; + } + } + excludeNodeSet("type", null); + + logger.info("---> Stopping docrep node holding the replica copy"); + internalCluster().restartNode(docrepReplicaNodeName); + ensureStableCluster(5); + waitNoPendingTasksOnAll(); + + logger.info("---> Assert remote index settings have been applied"); + assertRemoteProperties(indexName); + logger.info("---> Assert cluster is yellow since remote index settings have been applied"); + ensureYellowAndNoInitializingShards(indexName); + } + + /** + * Scenario: + * Creates a docrep cluster with 3 nodes and an index with 1 primary and 2 replicas. + * Adds 3 more remote nodes to the cluster and moves over the primary copy from docrep + * to remote through _cluster/reroute. Asserts that the remote store path based metadata + * have been applied to the index. + * Moves over the first replica copy and asserts that the remote store based settings has not been applied + * Excludes docrep nodes from allocation to force migration of the 3rd replica copy and asserts remote + * store settings has been applied as all shards have moved over + */ + public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws Exception { + String indexName = "index-1"; + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 3 docrep nodes"); + internalCluster().startDataOnlyNodes(3, Settings.builder().put("node.attr._type", "docrep").build()); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating index with 1 primary and 2 replicas"); + Settings oneReplica = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build(); + createIndexAndAssertDocrepProperties(indexName, oneReplica); + + logger.info("---> Adding 3 remote enabled nodes"); + initDocRepToRemoteMigration(); + addRemote = true; + List remoteEnabledNodes = internalCluster().startDataOnlyNodes( + 3, + Settings.builder().put("node.attr._type", "remote").build() + ); + + logger.info("---> Moving primary copy to remote enabled node"); + String primaryNodeName = primaryNodeName(indexName); + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName, 0, primaryNodeName, remoteEnabledNodes.get(0))) + .execute() + .actionGet() + ); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Assert custom remote path based metadata is applied"); + assertCustomIndexMetadata(indexName); + + logger.info("---> Moving over one replica copy to remote enabled node"); + String replicaNodeName = replicaNodeName(indexName); + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName, 0, replicaNodeName, remoteEnabledNodes.get(1))) + .execute() + .actionGet() + ); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Assert index still has docrep settings"); + assertDocrepProperties(indexName); + + logger.info("---> Excluding docrep nodes from allocation"); + excludeNodeSet("type", "docrep"); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Assert index has remote store settings"); + assertRemoteProperties(indexName); + } + + private void createIndexAndAssertDocrepProperties(String index, Settings settings) { + createIndexAssertHealthAndDocrepProperties(index, settings, this::ensureGreen); + } + + private void createIndexAssertHealthAndDocrepProperties( + String index, + Settings settings, + Function ensureState + ) { + createIndex(index, settings); + refresh(index); + ensureState.apply(index); + assertDocrepProperties(index); + } + + /** + * Assert current index settings have: + * - index.remote_store.enabled == false + * - index.remote_store.segment.repository == null + * - index.remote_store.translog.repository == null + * - index.replication.type == DOCUMENT + */ + private void assertDocrepProperties(String index) { + logger.info("---> Asserting docrep index settings"); + IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index); + Settings settings = iMd.getSettings(); + assertFalse(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)); + assertFalse(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(settings)); + assertFalse(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(settings)); + assertEquals(ReplicationType.DOCUMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings)); + } + + /** + * Assert current index settings have: + * - index.remote_store.enabled == true + * - index.remote_store.segment.repository != null + * - index.remote_store.translog.repository != null + * - index.replication.type == SEGMENT + * Asserts index metadata customs has the remote_store key + */ + private void assertRemoteProperties(String index) { + logger.info("---> Asserting remote index settings"); + IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index); + Settings settings = iMd.getSettings(); + assertTrue(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)); + assertTrue(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(settings)); + assertTrue(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(settings)); + assertEquals(ReplicationType.SEGMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings)); + assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)); + } + + /** + * Asserts index metadata customs has the remote_store key + */ + private void assertCustomIndexMetadata(String index) { + logger.info("---> Asserting custom index metadata"); + IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index); + assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index e6c149216da09..6292d32fee26d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -58,15 +59,19 @@ import org.opensearch.common.settings.SettingsException; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasAllRemoteStoreRelatedMetadata; + /** * Transport action for updating cluster settings * @@ -284,6 +289,7 @@ public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest validateAllNodesOfSameVersion(clusterState.nodes()); if (value.equals(RemoteStoreNodeService.CompatibilityMode.STRICT.mode)) { validateAllNodesOfSameType(clusterState.nodes()); + validateIndexSettings(clusterState); } } } @@ -317,4 +323,19 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { } } + /** + * Verifies that while trying to switch to STRICT compatibility mode, + * all indices in the cluster have {@link RemoteMigrationIndexMetadataUpdater#indexHasAllRemoteStoreRelatedMetadata(IndexMetadata)} as true. + * If not, throws {@link SettingsException} + * @param clusterState current cluster state + */ + private void validateIndexSettings(ClusterState clusterState) { + Collection allIndicesMetadata = clusterState.metadata().indices().values(); + if (allIndicesMetadata.isEmpty() == false + && allIndicesMetadata.stream().anyMatch(indexMetadata -> indexHasAllRemoteStoreRelatedMetadata(indexMetadata) == false)) { + throw new SettingsException( + "can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index faadc3f7583fb..7c179f6d4d8fd 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -302,6 +302,20 @@ public List shardsWithState(ShardRoutingState state) { return shards; } + /** + * Returns a {@link List} of shards that match the provided {@link Predicate} + * + * @param predicate {@link Predicate} to apply + * @return a {@link List} of shards that match one of the given {@link Predicate} + */ + public List shardsMatchingPredicate(Predicate predicate) { + List shards = new ArrayList<>(); + for (IndexShardRoutingTable shardRoutingTable : this) { + shards.addAll(shardRoutingTable.shardsMatchingPredicate(predicate)); + } + return shards; + } + public int shardsMatchingPredicateCount(Predicate predicate) { int count = 0; for (IndexShardRoutingTable shardRoutingTable : this) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 2c250f6a5d86e..fd8cbea42c12f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -904,6 +904,22 @@ public List shardsWithState(ShardRoutingState state) { return shards; } + /** + * Returns a {@link List} of shards that match the provided {@link Predicate} + * + * @param predicate {@link Predicate} to apply + * @return a {@link List} of shards that match one of the given {@link Predicate} + */ + public List shardsMatchingPredicate(Predicate predicate) { + List shards = new ArrayList<>(); + for (ShardRouting shardEntry : this) { + if (predicate.test(shardEntry)) { + shards.add(shardEntry); + } + } + return shards; + } + public int shardsMatchingPredicateCount(Predicate predicate) { int count = 0; for (ShardRouting shardEntry : this) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 7fc78b05880f3..ddcccd597e894 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -32,10 +32,12 @@ package org.opensearch.cluster.routing.allocation; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingChangesObserver; @@ -45,6 +47,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater; import java.util.Collections; import java.util.Comparator; @@ -67,14 +70,15 @@ * @opensearch.internal */ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { + private final Logger logger = LogManager.getLogger(IndexMetadataUpdater.class); private final Map shardChanges = new HashMap<>(); + private boolean ongoingRemoteStoreMigration = false; @Override public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) { assert initializedShard.isRelocationTarget() == false : "shardInitialized is not called on relocation target: " + initializedShard; if (initializedShard.primary()) { increasePrimaryTerm(initializedShard.shardId()); - Updates updates = changes(initializedShard.shardId()); assert updates.initializedPrimary == null : "Primary cannot be initialized more than once in same allocation round: " + "(previous: " @@ -113,6 +117,12 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) } increasePrimaryTerm(failedShard.shardId()); } + + // Track change through shardChanges Map regardless of above-mentioned conditions + // To be used to update index metadata while computing new cluster state + if (ongoingRemoteStoreMigration) { + changes(failedShard.shardId()); + } } @Override @@ -120,20 +130,34 @@ public void relocationCompleted(ShardRouting removedRelocationSource) { removeAllocationId(removedRelocationSource); } + /** + * Adds the target {@link ShardRouting} to the tracking updates set. + * Used to track started relocations while applying changes to the new {@link ClusterState} + */ + @Override + public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) { + // Store change in shardChanges Map regardless of above-mentioned conditions + // To be used to update index metadata while computing new cluster state + if (ongoingRemoteStoreMigration) { + changes(targetRelocatingShard.shardId()); + } + } + /** * Updates the current {@link Metadata} based on the changes of this RoutingChangesObserver. Specifically * we update {@link IndexMetadata#getInSyncAllocationIds()} and {@link IndexMetadata#primaryTerm(int)} based on * the changes made during this allocation. + *
+ * Manipulates index settings or index metadata during an ongoing remote store migration * * @param oldMetadata {@link Metadata} object from before the routing nodes was changed. * @param newRoutingTable {@link RoutingTable} object after routing changes were applied. * @return adapted {@link Metadata}, potentially the original one if no change was needed. */ - public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable) { + public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable, DiscoveryNodes discoveryNodes) { Map>> changesGroupedByIndex = shardChanges.entrySet() .stream() .collect(Collectors.groupingBy(e -> e.getKey().getIndex())); - Metadata.Builder metadataBuilder = null; for (Map.Entry>> indexChanges : changesGroupedByIndex.entrySet()) { Index index = indexChanges.getKey(); @@ -144,6 +168,17 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable) Updates updates = shardEntry.getValue(); indexMetadataBuilder = updateInSyncAllocations(newRoutingTable, oldIndexMetadata, indexMetadataBuilder, shardId, updates); indexMetadataBuilder = updatePrimaryTerm(oldIndexMetadata, indexMetadataBuilder, shardId, updates); + if (ongoingRemoteStoreMigration) { + RemoteMigrationIndexMetadataUpdater migrationImdUpdater = new RemoteMigrationIndexMetadataUpdater( + discoveryNodes, + newRoutingTable, + oldIndexMetadata, + oldMetadata.settings(), + logger + ); + migrationImdUpdater.maybeUpdateRemoteStorePathStrategy(indexMetadataBuilder, index.getName()); + migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName()); + } } if (indexMetadataBuilder != null) { @@ -369,6 +404,10 @@ private void increasePrimaryTerm(ShardId shardId) { changes(shardId).increaseTerm = true; } + public void setOngoingRemoteStoreMigration(boolean ongoingRemoteStoreMigration) { + this.ongoingRemoteStoreMigration = ongoingRemoteStoreMigration; + } + private static class Updates { private boolean increaseTerm; // whether primary term should be increased private Set addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingAllocation.java index bf2db57128517..fd789774f6f4f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingAllocation.java @@ -55,6 +55,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore; /** * The {@link RoutingAllocation} keep the state of the current allocation @@ -125,6 +126,9 @@ public RoutingAllocation( this.clusterInfo = clusterInfo; this.shardSizeInfo = shardSizeInfo; this.currentNanoTime = currentNanoTime; + if (isMigratingToRemoteStore(metadata)) { + indexMetadataUpdater.setOngoingRemoteStoreMigration(true); + } } /** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */ @@ -267,7 +271,7 @@ public RoutingChangesObserver changes() { * Returns updated {@link Metadata} based on the changes that were made to the routing nodes */ public Metadata updateMetadataWithRoutingChanges(RoutingTable newRoutingTable) { - return indexMetadataUpdater.applyChanges(metadata, newRoutingTable); + return indexMetadataUpdater.applyChanges(metadata, newRoutingTable, nodes()); } /** diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 14c6cecb1f847..e501d7eff3f81 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -132,6 +132,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.common.collect.MapBuilder.newMapBuilder; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings; /** * The main OpenSearch index service @@ -516,6 +517,17 @@ public synchronized IndexShard createShard( ); } remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); + } else { + // Disallow shards with remote store based settings to be created on non-remote store enabled nodes + // Even though we have `RemoteStoreMigrationAllocationDecider` in place to prevent something like this from happening at the + // allocation level, + // keeping this defensive check in place + // TODO: Remove this once remote to docrep migration is supported + if (indexHasRemoteStoreSettings(indexSettings)) { + throw new IllegalStateException( + "[{" + routing.shardId() + "}] Cannot initialize shards with remote store index settings on non-remote store nodes" + ); + } } Directory directory = directoryFactory.newDirectory(this.indexSettings, path); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java new file mode 100644 index 0000000000000..761fa20ea64e5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_CUSTOM_KEY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategyDuringMigration; +import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; + +/** + * Utils for checking and mutating cluster state during remote migration + * + * @opensearch.internal + */ +public class RemoteMigrationIndexMetadataUpdater { + private final DiscoveryNodes discoveryNodes; + private final RoutingTable routingTable; + private final Settings clusterSettings; + private final IndexMetadata indexMetadata; + private final Logger logger; + + public RemoteMigrationIndexMetadataUpdater( + DiscoveryNodes discoveryNodes, + RoutingTable routingTable, + IndexMetadata indexMetadata, + Settings clusterSettings, + Logger logger + + ) { + this.discoveryNodes = discoveryNodes; + this.routingTable = routingTable; + this.clusterSettings = clusterSettings; + this.indexMetadata = indexMetadata; + this.logger = logger; + } + + /** + * During docrep to remote store migration, applies the following remote store based index settings + * once all shards of an index have moved over to remote store enabled nodes + *
+ * Also appends the requisite Remote Store Path based custom metadata to the existing index metadata + */ + public void maybeAddRemoteIndexSettings(IndexMetadata.Builder indexMetadataBuilder, String index) { + Settings currentIndexSettings = indexMetadata.getSettings(); + if (needsRemoteIndexSettingsUpdate(routingTable.indicesRouting().get(index), discoveryNodes, currentIndexSettings)) { + logger.info( + "Index {} does not have remote store based index settings but all primary shards and STARTED replica shards have moved to remote enabled nodes. Applying remote store settings to the index", + index + ); + Map remoteRepoNames = getRemoteStoreRepoName(discoveryNodes); + String segmentRepoName = remoteRepoNames.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + String tlogRepoName = remoteRepoNames.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + assert Objects.nonNull(segmentRepoName) && Objects.nonNull(tlogRepoName) : "Remote repo names cannot be null"; + Settings.Builder indexSettingsBuilder = Settings.builder().put(currentIndexSettings); + updateRemoteStoreSettings(indexSettingsBuilder, segmentRepoName, tlogRepoName); + indexMetadataBuilder.settings(indexSettingsBuilder); + indexMetadataBuilder.settingsVersion(1 + indexMetadata.getVersion()); + } else { + logger.debug("Index {} does not satisfy criteria for applying remote store settings", index); + } + } + + /** + * Returns true iff all the below conditions are true: + * - All primary shards are in {@link ShardRoutingState#STARTED} state and are in remote store enabled nodes + * - No replica shard in {@link ShardRoutingState#RELOCATING} state + * - All {@link ShardRoutingState#STARTED} replica shards are in remote store enabled nodes + * + * @param indexRoutingTable current {@link IndexRoutingTable} from cluster state + * @param discoveryNodes set of discovery nodes from cluster state + * @param currentIndexSettings current {@link IndexMetadata} from cluster state + * @return true or false depending on the met conditions + */ + private boolean needsRemoteIndexSettingsUpdate( + IndexRoutingTable indexRoutingTable, + DiscoveryNodes discoveryNodes, + Settings currentIndexSettings + ) { + assert currentIndexSettings != null : "IndexMetadata for a shard cannot be null"; + if (indexHasRemoteStoreSettings(currentIndexSettings) == false) { + boolean allPrimariesStartedAndOnRemote = indexRoutingTable.shardsMatchingPredicate(ShardRouting::primary) + .stream() + .allMatch(shardRouting -> shardRouting.started() && discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + List replicaShards = indexRoutingTable.shardsMatchingPredicate(shardRouting -> shardRouting.primary() == false); + boolean noRelocatingReplicas = replicaShards.stream().noneMatch(ShardRouting::relocating); + boolean allStartedReplicasOnRemote = replicaShards.stream() + .filter(ShardRouting::started) + .allMatch(shardRouting -> discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + return allPrimariesStartedAndOnRemote && noRelocatingReplicas && allStartedReplicasOnRemote; + } + return false; + } + + /** + * Updates the remote store path strategy metadata for the index when it is migrating to remote. + * This is run during state change of each shard copy when the cluster is in `MIXED` mode and the direction of migration is `REMOTE_STORE` + * Should not interfere with docrep functionality even if the index is in docrep nodes since this metadata + * is not used anywhere in the docrep flow + * Checks are in place to make this execution no-op if the index metadata is already present. + * + * @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates + * @param index index name + */ + public void maybeUpdateRemoteStorePathStrategy(IndexMetadata.Builder indexMetadataBuilder, String index) { + if (indexHasRemotePathMetadata(indexMetadata) == false) { + logger.info("Adding remote store path strategy for index [{}] during migration", index); + indexMetadataBuilder.putCustom( + REMOTE_STORE_CUSTOM_KEY, + determineRemoteStorePathStrategyDuringMigration(clusterSettings, discoveryNodes) + ); + } else { + logger.debug("Index {} already has remote store path strategy", index); + } + } + + public static boolean indexHasAllRemoteStoreRelatedMetadata(IndexMetadata indexMetadata) { + return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemotePathMetadata(indexMetadata); + } + + /** + * Assert current index settings have: + * - index.remote_store.enabled == true + * - index.remote_store.segment.repository != null + * - index.remote_store.translog.repository != null + * - index.replication.type == SEGMENT + * + * @param indexSettings Current index settings + * @return true if all above conditions match. false otherwise + */ + public static boolean indexHasRemoteStoreSettings(Settings indexSettings) { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.exists(indexSettings) + && IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexSettings) + && IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexSettings) + && IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexSettings) == ReplicationType.SEGMENT; + } + + /** + * Asserts current index metadata customs has the {@link IndexMetadata#REMOTE_STORE_CUSTOM_KEY} key. + * If it does, checks if the path_type sub-key is present + * + * @param indexMetadata Current index metadata + * @return true if all above conditions match. false otherwise + */ + public static boolean indexHasRemotePathMetadata(IndexMetadata indexMetadata) { + Map customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY); + return Objects.nonNull(customMetadata) && Objects.nonNull(customMetadata.get(PathType.NAME)); + } + + public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, String segmentRepository, String translogRepository) { + settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepository) + .put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepository); + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 7208dac162e1a..27b1b88034573 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,8 +8,16 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.nio.ByteBuffer; import java.util.Arrays; @@ -19,14 +27,19 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.Function; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; + /** * Utils for remote store * * @opensearch.internal */ public class RemoteStoreUtils { + private static final Logger logger = LogManager.getLogger(RemoteStoreUtils.class); public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length(); /** @@ -167,4 +180,48 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta } return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED); } + + /** + * Generates the remote store path type information to be added to custom data of index metadata during migration + * + * @param clusterSettings Current Cluster settings from {@link ClusterState} + * @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state + * @return {@link Map} to be added as custom data in index metadata + */ + public static Map determineRemoteStorePathStrategyDuringMigration( + Settings clusterSettings, + DiscoveryNodes discoveryNodes + ) { + Version minNodeVersion = discoveryNodes.getMinNodeVersion(); + RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0 + ? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings) + : RemoteStoreEnums.PathType.FIXED; + RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm = pathType == RemoteStoreEnums.PathType.FIXED + ? null + : CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.get(clusterSettings); + Map remoteCustomData = new HashMap<>(); + remoteCustomData.put(RemoteStoreEnums.PathType.NAME, pathType.name()); + if (Objects.nonNull(pathHashAlgorithm)) { + remoteCustomData.put(RemoteStoreEnums.PathHashAlgorithm.NAME, pathHashAlgorithm.name()); + } + return remoteCustomData; + } + + /** + * Fetches segment and translog repository names from remote store node attributes. + * Returns a blank {@link HashMap} if the cluster does not contain any remote nodes. + *
+ * Caller need to handle null checks if {@link DiscoveryNodes} object does not have any remote nodes + * + * @param discoveryNodes Current set of {@link DiscoveryNodes} in the cluster + * @return {@link Map} of data repository node attributes keys and their values + */ + public static Map getRemoteStoreRepoName(DiscoveryNodes discoveryNodes) { + Optional remoteNode = discoveryNodes.getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new); + } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index a3bfe1195d8cc..b10ec0d99c3d5 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -18,6 +18,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -47,6 +48,11 @@ public class RemoteStoreNodeAttribute { public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings."; private final RepositoriesMetadata repositoriesMetadata; + public static List SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of( + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + /** * Creates a new {@link RemoteStoreNodeAttribute} */ @@ -185,6 +191,30 @@ public RepositoriesMetadata getRepositoriesMetadata() { return this.repositoriesMetadata; } + /** + * Return {@link Map} of all the supported data repo names listed on {@link RemoteStoreNodeAttribute#SUPPORTED_DATA_REPO_NAME_ATTRIBUTES} + * + * @param node Node to fetch attributes from + * @return {@link Map} of all remote store data repo attribute keys and their values + */ + public static Map getDataRepoNames(DiscoveryNode node) { + assert remoteDataAttributesPresent(node.getAttributes()); + Map dataRepoNames = new HashMap<>(); + for (String supportedRepoAttribute : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { + dataRepoNames.put(supportedRepoAttribute, node.getAttributes().get(supportedRepoAttribute)); + } + return dataRepoNames; + } + + private static boolean remoteDataAttributesPresent(Map nodeAttrs) { + for (String supportedRepoAttributes : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { + if (nodeAttrs.get(supportedRepoAttributes) == null || nodeAttrs.get(supportedRepoAttributes).isEmpty()) { + return false; + } + } + return true; + } + @Override public int hashCode() { // The hashCode is generated by computing the hash of all the repositoryMetadata present in diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index adfb751421db7..874c9408de6c5 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -227,7 +227,14 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode } /** - * To check if the cluster is undergoing remote store migration + * Returns true iff current cluster settings have: + *
+ * - remote_store.compatibility_mode set to mixed + *
+ * - migration.direction set to remote_store + *
+ * false otherwise + * * @param clusterSettings cluster level settings * @return * true For REMOTE_STORE migration direction and MIXED compatibility mode, diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index b3eb2443fa940..35c5c5e605b4d 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -86,6 +86,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithDocrepSettings; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; @@ -791,7 +793,9 @@ public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { .add(nonRemoteNode2) .localNodeId(nonRemoteNode2.getId()) .build(); - ClusterState sameTypeClusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).build(); + + metadata = createIndexMetadataWithRemoteStoreSettings("test-index"); + ClusterState sameTypeClusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).metadata(metadata).build(); transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState); // cluster with only non-remote nodes @@ -801,10 +805,84 @@ public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { .add(remoteNode2) .localNodeId(remoteNode2.getId()) .build(); - sameTypeClusterState = ClusterState.builder(sameTypeClusterState).nodes(discoveryNodes).build(); + sameTypeClusterState = ClusterState.builder(sameTypeClusterState).nodes(discoveryNodes).metadata(metadata).build(); transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState); } + public void testDontAllowSwitchingToStrictCompatibilityModeWithoutRemoteIndexSettings() { + Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Settings currentCompatibilityModeSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .build(); + Settings intendedCompatibilityModeSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.STRICT) + .build(); + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + request.persistentSettings(intendedCompatibilityModeSettings); + DiscoveryNode remoteNode1 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode remoteNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + AllocationService allocationService = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( + transportService, + clusterService, + threadPool, + allocationService, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + clusterService.getClusterSettings() + ); + + Metadata nonRemoteIndexMd = Metadata.builder(createIndexMetadataWithDocrepSettings("test")) + .persistentSettings(currentCompatibilityModeSettings) + .build(); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(nonRemoteIndexMd) + .nodes(discoveryNodes) + .build(); + final SettingsException exception = expectThrows( + SettingsException.class, + () -> transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterState) + ); + assertEquals( + "can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings", + exception.getMessage() + ); + + Metadata remoteIndexMd = Metadata.builder(createIndexMetadataWithRemoteStoreSettings("test")) + .persistentSettings(currentCompatibilityModeSettings) + .build(); + ClusterState clusterStateWithRemoteIndices = ClusterState.builder(ClusterName.DEFAULT) + .metadata(remoteIndexMd) + .nodes(discoveryNodes) + .build(); + transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterStateWithRemoteIndices); + } + public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersions() { Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); @@ -897,7 +975,10 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion .localNodeId(discoveryNode2.getId()) .build(); - ClusterState sameVersionClusterState = ClusterState.builder(differentVersionClusterState).nodes(discoveryNodes).build(); + ClusterState sameVersionClusterState = ClusterState.builder(differentVersionClusterState) + .nodes(discoveryNodes) + .metadata(createIndexMetadataWithRemoteStoreSettings("test")) + .build(); transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameVersionClusterState); } @@ -907,4 +988,5 @@ private Map getRemoteStoreNodeAttributes() { remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1"); return remoteStoreNodeAttributes; } + } diff --git a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java index ebb7529d3f733..e881016fb9305 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; public class IndexShardRoutingTableTests extends OpenSearchTestCase { public void testEqualsAttributesKey() { @@ -69,4 +70,48 @@ public void testEquals() { assertNotEquals(table1, s); assertNotEquals(table1, table3); } + + public void testShardsMatchingPredicate() { + ShardId shardId = new ShardId(new Index("a", UUID.randomUUID().toString()), 0); + ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node-1", true, ShardRoutingState.STARTED); + ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node-2", false, ShardRoutingState.STARTED); + ShardRouting unassignedReplica = ShardRouting.newUnassigned( + shardId, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ); + ShardRouting relocatingReplica1 = TestShardRouting.newShardRouting( + shardId, + "node-3", + "node-4", + false, + ShardRoutingState.RELOCATING + ); + ShardRouting relocatingReplica2 = TestShardRouting.newShardRouting( + shardId, + "node-4", + "node-5", + false, + ShardRoutingState.RELOCATING + ); + + IndexShardRoutingTable table = new IndexShardRoutingTable( + shardId, + Arrays.asList(primary, replica, unassignedReplica, relocatingReplica1, relocatingReplica2) + ); + assertEquals(List.of(primary), table.shardsMatchingPredicate(ShardRouting::primary)); + assertEquals( + List.of(replica, unassignedReplica, relocatingReplica1, relocatingReplica2), + table.shardsMatchingPredicate(shardRouting -> !shardRouting.primary()) + ); + assertEquals( + List.of(unassignedReplica), + table.shardsMatchingPredicate(shardRouting -> !shardRouting.primary() && shardRouting.unassigned()) + ); + assertEquals( + Arrays.asList(relocatingReplica1, relocatingReplica2), + table.shardsMatchingPredicate(shardRouting -> !shardRouting.primary() && shardRouting.relocating()) + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 04e37e7d958d0..5e3b74ee138ab 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -68,7 +68,8 @@ import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.anyOf; @@ -852,8 +853,10 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { // add a remote node and start primary shard Map remoteStoreNodeAttributes = Map.of( - REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, - "REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE" + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + "REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_VALUE", + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + "REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_VALUE" ); DiscoveryNode remoteNode1 = new DiscoveryNode( UUIDs.base64UUID(), diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java new file mode 100644 index 0000000000000..d8220c93e4eeb --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java @@ -0,0 +1,339 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShardTestUtils; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_CUSTOM_KEY; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; +import static org.mockito.Mockito.mock; + +public class RemoteMigrationIndexMetadataUpdaterTests extends OpenSearchTestCase { + private final String indexName = "test-index"; + + public void testMaybeAddRemoteIndexSettingsAllPrimariesAndReplicasOnRemote() throws IOException { + Metadata metadata = createIndexMetadataWithDocrepSettings(indexName); + IndexMetadata existingIndexMetadata = metadata.index(indexName); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(existingIndexMetadata); + long currentSettingsVersion = indexMetadataBuilder.settingsVersion(); + DiscoveryNode primaryNode = IndexShardTestUtils.getFakeRemoteEnabledNode("1"); + DiscoveryNode replicaNode = IndexShardTestUtils.getFakeRemoteEnabledNode("2"); + DiscoveryNodes allNodes = DiscoveryNodes.builder().add(primaryNode).add(replicaNode).build(); + RoutingTable routingTable = createRoutingTableAllShardsStarted(indexName, 1, 1, primaryNode, replicaNode); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + allNodes, + routingTable, + existingIndexMetadata, + metadata.settings(), + logger + ); + migrationIndexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexName); + assertTrue(currentSettingsVersion < indexMetadataBuilder.settingsVersion()); + assertRemoteSettingsApplied(indexMetadataBuilder.build()); + } + + public void testMaybeAddRemoteIndexSettingsDoesNotRunWhenSettingsAlreadyPresent() throws IOException { + Metadata metadata = createIndexMetadataWithRemoteStoreSettings(indexName); + IndexMetadata existingIndexMetadata = metadata.index(indexName); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(existingIndexMetadata); + long currentSettingsVersion = indexMetadataBuilder.settingsVersion(); + DiscoveryNode primaryNode = IndexShardTestUtils.getFakeRemoteEnabledNode("1"); + DiscoveryNode replicaNode = IndexShardTestUtils.getFakeRemoteEnabledNode("2"); + DiscoveryNodes allNodes = DiscoveryNodes.builder().add(primaryNode).add(replicaNode).build(); + RoutingTable routingTable = createRoutingTableAllShardsStarted(indexName, 1, 1, primaryNode, replicaNode); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + allNodes, + routingTable, + existingIndexMetadata, + metadata.settings(), + logger + ); + migrationIndexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexName); + assertEquals(currentSettingsVersion, indexMetadataBuilder.settingsVersion()); + } + + public void testMaybeAddRemoteIndexSettingsDoesNotUpdateSettingsWhenAllShardsInDocrep() throws IOException { + Metadata metadata = createIndexMetadataWithDocrepSettings(indexName); + IndexMetadata existingIndexMetadata = metadata.index(indexName); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(existingIndexMetadata); + long currentSettingsVersion = indexMetadataBuilder.settingsVersion(); + DiscoveryNode primaryNode = IndexShardTestUtils.getFakeDiscoNode("1"); + DiscoveryNode replicaNode = IndexShardTestUtils.getFakeDiscoNode("2"); + DiscoveryNodes allNodes = DiscoveryNodes.builder().add(primaryNode).add(replicaNode).build(); + RoutingTable routingTable = createRoutingTableAllShardsStarted(indexName, 1, 1, primaryNode, replicaNode); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + allNodes, + routingTable, + existingIndexMetadata, + metadata.settings(), + logger + ); + migrationIndexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexName); + assertEquals(currentSettingsVersion, indexMetadataBuilder.settingsVersion()); + assertDocrepSettingsApplied(indexMetadataBuilder.build()); + } + + public void testMaybeAddRemoteIndexSettingsUpdatesIndexSettingsWithUnassignedReplicas() throws IOException { + Metadata metadata = createIndexMetadataWithDocrepSettings(indexName); + IndexMetadata existingIndexMetadata = metadata.index(indexName); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(existingIndexMetadata); + long currentSettingsVersion = indexMetadataBuilder.settingsVersion(); + DiscoveryNode primaryNode = IndexShardTestUtils.getFakeRemoteEnabledNode("1"); + DiscoveryNode replicaNode = IndexShardTestUtils.getFakeDiscoNode("2"); + DiscoveryNodes allNodes = DiscoveryNodes.builder().add(primaryNode).add(replicaNode).build(); + RoutingTable routingTable = createRoutingTableReplicasUnassigned(indexName, 1, 1, primaryNode); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + allNodes, + routingTable, + existingIndexMetadata, + metadata.settings(), + logger + ); + migrationIndexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexName); + assertTrue(currentSettingsVersion < indexMetadataBuilder.settingsVersion()); + assertRemoteSettingsApplied(indexMetadataBuilder.build()); + } + + public void testMaybeAddRemoteIndexSettingsDoesNotUpdateIndexSettingsWithRelocatingReplicas() throws IOException { + Metadata metadata = createIndexMetadataWithDocrepSettings(indexName); + IndexMetadata existingIndexMetadata = metadata.index(indexName); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(existingIndexMetadata); + long currentSettingsVersion = indexMetadataBuilder.settingsVersion(); + DiscoveryNode primaryNode = IndexShardTestUtils.getFakeRemoteEnabledNode("1"); + DiscoveryNode replicaNode = IndexShardTestUtils.getFakeDiscoNode("2"); + DiscoveryNode replicaRelocatingNode = IndexShardTestUtils.getFakeDiscoNode("3"); + DiscoveryNodes allNodes = DiscoveryNodes.builder().add(primaryNode).add(replicaNode).build(); + RoutingTable routingTable = createRoutingTableReplicasRelocating(indexName, 1, 1, primaryNode, replicaNode, replicaRelocatingNode); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + allNodes, + routingTable, + existingIndexMetadata, + metadata.settings(), + logger + ); + migrationIndexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexName); + assertEquals(currentSettingsVersion, indexMetadataBuilder.settingsVersion()); + assertDocrepSettingsApplied(indexMetadataBuilder.build()); + } + + public void testMaybeUpdateRemoteStorePathStrategyExecutes() { + Metadata currentMetadata = createIndexMetadataWithDocrepSettings(indexName); + IndexMetadata existingIndexMetadata = currentMetadata.index(indexName); + IndexMetadata.Builder builder = IndexMetadata.builder(existingIndexMetadata); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("1")).build(); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + discoveryNodes, + mock(RoutingTable.class), + existingIndexMetadata, + Settings.builder() + .put( + CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name() + ) + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.name()) + .build(), + logger + ); + migrationIndexMetadataUpdater.maybeUpdateRemoteStorePathStrategy(builder, indexName); + assertCustomPathMetadataIsPresent(builder.build()); + } + + public void testMaybeUpdateRemoteStorePathStrategyDoesNotExecute() { + Metadata currentMetadata = createIndexMetadataWithRemoteStoreSettings(indexName); + IndexMetadata existingIndexMetadata = currentMetadata.index(indexName); + IndexMetadata.Builder builder = IndexMetadata.builder(currentMetadata.index(indexName)); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("1")).build(); + RemoteMigrationIndexMetadataUpdater migrationIndexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + discoveryNodes, + mock(RoutingTable.class), + existingIndexMetadata, + Settings.builder() + .put( + CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.name() + ) + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.name()) + .build(), + logger + ); + + migrationIndexMetadataUpdater.maybeUpdateRemoteStorePathStrategy(builder, indexName); + + assertCustomPathMetadataIsPresent(builder.build()); + } + + private RoutingTable createRoutingTableAllShardsStarted( + String indexName, + int numberOfShards, + int numberOfReplicas, + DiscoveryNode primaryHostingNode, + DiscoveryNode replicaHostingNode + ) { + RoutingTable.Builder builder = RoutingTable.builder(); + Index index = new Index(indexName, UUID.randomUUID().toString()); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, primaryHostingNode.getId(), true, ShardRoutingState.STARTED) + ); + for (int j = 0; j < numberOfReplicas; j++) { + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, replicaHostingNode.getId(), false, ShardRoutingState.STARTED) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); + } + return builder.add(indexRoutingTableBuilder.build()).build(); + } + + private RoutingTable createRoutingTableReplicasUnassigned( + String indexName, + int numberOfShards, + int numberOfReplicas, + DiscoveryNode primaryHostingNode + ) { + RoutingTable.Builder builder = RoutingTable.builder(); + Index index = new Index(indexName, UUID.randomUUID().toString()); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, primaryHostingNode.getId(), true, ShardRoutingState.STARTED) + ); + for (int j = 0; j < numberOfReplicas; j++) { + indexShardRoutingTable.addShard( + ShardRouting.newUnassigned( + shardId, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); + } + return builder.add(indexRoutingTableBuilder.build()).build(); + } + + private RoutingTable createRoutingTableReplicasRelocating( + String indexName, + int numberOfShards, + int numberOfReplicas, + DiscoveryNode primaryHostingNodes, + DiscoveryNode replicaHostingNode, + DiscoveryNode replicaRelocatingNode + ) { + RoutingTable.Builder builder = RoutingTable.builder(); + Index index = new Index(indexName, UUID.randomUUID().toString()); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, primaryHostingNodes.getId(), true, ShardRoutingState.STARTED) + ); + for (int j = 0; j < numberOfReplicas; j++) { + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting( + shardId, + replicaHostingNode.getId(), + replicaRelocatingNode.getId(), + false, + ShardRoutingState.RELOCATING + ) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); + } + return builder.add(indexRoutingTableBuilder.build()).build(); + } + + public static Metadata createIndexMetadataWithRemoteStoreSettings(String indexName) { + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "dummy-tlog-repo") + .put(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.getKey(), "dummy-segment-repo") + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT") + .build() + ) + .putCustom( + REMOTE_STORE_CUSTOM_KEY, + Map.of(RemoteStoreEnums.PathType.NAME, "dummy", RemoteStoreEnums.PathHashAlgorithm.NAME, "dummy") + ) + .build(); + return Metadata.builder().put(indexMetadata).build(); + } + + public static Metadata createIndexMetadataWithDocrepSettings(String indexName) { + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "DOCUMENT") + .build() + ).build(); + return Metadata.builder().put(indexMetadata).build(); + } + + private void assertRemoteSettingsApplied(IndexMetadata indexMetadata) { + assertTrue(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.SEGMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private void assertDocrepSettingsApplied(IndexMetadata indexMetadata) { + assertFalse(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.DOCUMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private void assertCustomPathMetadataIsPresent(IndexMetadata indexMetadata) { + assertNotNull(indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY)); + assertNotNull(indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathType.NAME)); + assertNotNull(indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY).get(RemoteStoreEnums.PathHashAlgorithm.NAME)); + } +} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 4d3e633848975..c1fc0cdaa0d3b 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -8,10 +8,13 @@ package org.opensearch.index.remote; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.test.OpenSearchTestCase; import java.math.BigInteger; @@ -28,6 +31,8 @@ import static org.opensearch.index.remote.RemoteStoreUtils.longToUrlBase64; import static org.opensearch.index.remote.RemoteStoreUtils.urlBase64ToLong; import static org.opensearch.index.remote.RemoteStoreUtils.verifyNoMultipleWriters; +import static org.opensearch.index.shard.IndexShardTestUtils.MOCK_SEGMENT_REPO_NAME; +import static org.opensearch.index.shard.IndexShardTestUtils.MOCK_TLOG_REPO_NAME; import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX; import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; @@ -316,6 +321,19 @@ public void testLongToCompositeUrlBase64AndBinaryEncoding() { } } + public void testGetRemoteStoreRepoNameWithRemoteNodes() { + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("1")).build(); + Map expected = new HashMap<>(); + expected.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_SEGMENT_REPO_NAME); + expected.put(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_TLOG_REPO_NAME); + assertEquals(expected, RemoteStoreUtils.getRemoteStoreRepoName(discoveryNodes)); + } + + public void testGetRemoteStoreRepoNameWithDocrepNdoes() { + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeDiscoNode("1")).build(); + assertTrue(RemoteStoreUtils.getRemoteStoreRepoName(discoveryNodes).isEmpty()); + } + static long compositeUrlBase64BinaryEncodingToLong(String encodedValue) { char ch = encodedValue.charAt(0); int base64BitsIntValue = BASE64_CHARSET_IDX_MAP.get(ch); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java index d3a4a95c3bdef..abf8f2a4da6c1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java @@ -21,6 +21,9 @@ import java.util.Map; public class IndexShardTestUtils { + public static final String MOCK_SEGMENT_REPO_NAME = "segment-test-repo"; + public static final String MOCK_TLOG_REPO_NAME = "tlog-test-repo"; + public static DiscoveryNode getFakeDiscoNode(String id) { return new DiscoveryNode( id, @@ -34,7 +37,8 @@ public static DiscoveryNode getFakeDiscoNode(String id) { public static DiscoveryNode getFakeRemoteEnabledNode(String id) { Map remoteNodeAttributes = new HashMap(); - remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test-repo"); + remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_SEGMENT_REPO_NAME); + remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, MOCK_TLOG_REPO_NAME); return new DiscoveryNode( id, id, diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 33f6d1736f3a8..5fa23a7b0827a 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2428,6 +2428,12 @@ protected String primaryNodeName(String indexName) { return clusterState.getRoutingNodes().node(nodeId).node().getName(); } + protected String primaryNodeName(String indexName, int shardId) { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + String nodeId = clusterState.getRoutingTable().index(indexName).shard(shardId).primaryShard().currentNodeId(); + return clusterState.getRoutingNodes().node(nodeId).node().getName(); + } + protected String replicaNodeName(String indexName) { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).replicaShards().get(0).currentNodeId();