From 95ea9b9c868e89ba423a8ac09f17e270e8868f7b Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Tue, 2 Apr 2024 17:03:01 +0530 Subject: [PATCH] =?UTF-8?q?Revert=20"[Remote=20Store]=20Add=20Primary/Repl?= =?UTF-8?q?ica=20side=20changes=20to=20support=20Dual=20Repli=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 8def8cb7cdc56fe7b6930377001513391f067422. --- .../opensearch/index/shard/IndexShardIT.java | 3 +- .../MigrationBaseTestCase.java | 95 +--- .../RemoteDualReplicationIT.java | 530 ------------------ .../action/bulk/TransportShardBulkAction.java | 2 +- .../ReplicationModeAwareProxy.java | 28 +- .../TransportReplicationAction.java | 12 +- .../org/opensearch/index/IndexService.java | 7 +- .../org/opensearch/index/IndexSettings.java | 6 +- .../seqno/GlobalCheckpointSyncAction.java | 2 +- .../index/seqno/ReplicationTracker.java | 69 +-- .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 37 +- .../opensearch/indices/IndicesService.java | 7 +- .../cluster/IndicesClusterStateService.java | 12 +- .../RecoverySourceHandlerFactory.java | 3 +- .../indices/recovery/RecoveryTarget.java | 11 +- .../SegmentReplicationSourceFactory.java | 2 +- .../checkpoint/PublishCheckpointAction.java | 8 +- ...portVerifyShardBeforeCloseActionTests.java | 7 +- .../flush/TransportShardFlushActionTests.java | 5 +- ...sportVerifyShardIndexBlockActionTests.java | 5 +- .../TransportShardRefreshActionTests.java | 5 +- .../bulk/TransportShardBulkActionTests.java | 5 +- ...TransportResyncReplicationActionTests.java | 5 +- .../ReplicationModeAwareProxyTests.java | 216 ------- .../ReplicationOperationTests.java | 196 +------ .../TransportReplicationActionTests.java | 7 - .../index/remote/RemoteStoreTestsHelper.java | 19 - .../RetentionLeasesReplicationTests.java | 4 +- .../GlobalCheckpointSyncActionTests.java | 8 +- ...PeerRecoveryRetentionLeaseExpiryTests.java | 3 +- ...ReplicationTrackerRetentionLeaseTests.java | 48 +- .../seqno/ReplicationTrackerTestCase.java | 23 +- .../index/seqno/ReplicationTrackerTests.java | 32 +- ...tentionLeaseBackgroundSyncActionTests.java | 5 +- .../seqno/RetentionLeaseSyncActionTests.java | 7 +- .../index/shard/IndexShardTests.java | 21 +- .../shard/PrimaryReplicaSyncerTests.java | 6 +- ...dicesLifecycleListenerSingleNodeTests.java | 4 +- ...actIndicesClusterStateServiceTestCase.java | 7 +- .../PeerRecoverySourceServiceTests.java | 5 +- .../PeerRecoveryTargetServiceTests.java | 5 +- .../PublishCheckpointActionTests.java | 41 +- .../index/engine/EngineTestCase.java | 3 +- ...enSearchIndexLevelReplicationTestCase.java | 31 +- .../index/shard/IndexShardTestCase.java | 69 +-- .../index/shard/IndexShardTestUtils.java | 67 --- 47 files changed, 155 insertions(+), 1540 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java delete mode 100644 server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java delete mode 100644 test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index f97950f2652a3..d218f0a985cf3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -715,8 +715,7 @@ public static final IndexShard newIndexShard( nodeId, null, DefaultRemoteStoreSettings.INSTANCE, - false, - IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting) + false ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0c35f91121059..19da668c432cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -8,29 +8,17 @@ package org.opensearch.remotemigration; -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; -import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; -import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -40,16 +28,8 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; - boolean addRemote = false; - Settings extraSettings = Settings.EMPTY; - private final List documentKeys = List.of( - randomAlphaOfLength(5), - randomAlphaOfLength(5), - randomAlphaOfLength(5), - randomAlphaOfLength(5), - randomAlphaOfLength(5) - ); + boolean addRemote = false; protected Settings nodeSettings(int nodeOrdinal) { if (segmentRepoPath == null || translogRepoPath == null) { @@ -60,7 +40,6 @@ protected Settings nodeSettings(int nodeOrdinal) { logger.info("Adding remote store node"); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(extraSettings) .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) .build(); } else { @@ -85,76 +64,4 @@ protected void setFailRate(String repoName, int value) throws ExecutionException client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get() ); } - - public void initDocRepToRemoteMigration() { - assertTrue( - internalCluster().client() - .admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder() - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - ) - .get() - .isAcknowledged() - ); - } - - public BulkResponse indexBulk(String indexName, int numDocs) { - BulkRequest bulkRequest = new BulkRequest(); - for (int i = 0; i < numDocs; i++) { - final IndexRequest request = client().prepareIndex(indexName) - .setId(UUIDs.randomBase64UUID()) - .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) - .request(); - bulkRequest.add(request); - } - return client().bulk(bulkRequest).actionGet(); - } - - private void indexSingleDoc(String indexName) { - IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex(indexName).setSource("auto", true).get(); - } - - public class AsyncIndexingService { - private String indexName; - private AtomicLong indexedDocs = new AtomicLong(0); - private AtomicBoolean finished = new AtomicBoolean(); - private Thread indexingThread; - - AsyncIndexingService(String indexName) { - this.indexName = indexName; - } - - public void startIndexing() { - indexingThread = getIndexingThread(); - indexingThread.start(); - } - - public void stopIndexing() throws InterruptedException { - finished.set(true); - indexingThread.join(); - } - - public long getIndexedDocs() { - return indexedDocs.get(); - } - - private Thread getIndexingThread() { - return new Thread(() -> { - while (finished.get() == false) { - indexSingleDoc(indexName); - long currentDocCount = indexedDocs.incrementAndGet(); - logger.info("Completed ingestion of {} docs", currentDocCount); - - } - }); - } - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java deleted file mode 100644 index 34b60d5f3e9b3..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ /dev/null @@ -1,530 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.remotemigration; - -import org.opensearch.action.admin.indices.stats.CommonStats; -import org.opensearch.action.admin.indices.stats.ShardStats; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.IndexService; -import org.opensearch.index.remote.RemoteSegmentStats; -import org.opensearch.index.seqno.RetentionLease; -import org.opensearch.index.seqno.RetentionLeases; -import org.opensearch.indices.IndexingMemoryController; -import org.opensearch.plugins.Plugin; -import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; -import org.opensearch.test.InternalSettingsPlugin; -import org.opensearch.test.InternalTestCluster; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; - -import java.util.Collection; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteDualReplicationIT extends MigrationBaseTestCase { - private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; - private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; - private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; - - @Override - protected Collection> nodePlugins() { - /* Adding the following mock plugins: - - InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync - - MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated - */ - return Stream.concat( - super.nodePlugins().stream(), - Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class) - ).collect(Collectors.toList()); - } - - /* - Scenario: - - Starts 2 docrep backed node - - Creates index with 1 replica - - Index some docs - - Start 1 remote backed node - - Move primary copy from docrep to remote through _cluster/reroute - - Index some more docs - - Assert primary-replica consistency - */ - public void testRemotePrimaryDocRepReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - - logger.info("---> Starting 2 docrep data nodes"); - internalCluster().startDataOnlyNodes(2); - internalCluster().validateClusterFormed(); - assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); - - logger.info("---> Creating index with 1 replica"); - Settings oneReplica = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") - .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build(); - createIndex(REMOTE_PRI_DOCREP_REP, oneReplica); - ensureGreen(REMOTE_PRI_DOCREP_REP); - - int initialBatch = randomIntBetween(1, 1000); - logger.info("---> Indexing {} docs", initialBatch); - indexBulk(REMOTE_PRI_DOCREP_REP, initialBatch); - - initDocRepToRemoteMigration(); - - logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; - String remoteNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals( - internalCluster().client() - .admin() - .cluster() - .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) - .get() - .repositories() - .size(), - 2 - ); - - String primaryShardHostingNode = primaryNodeName(REMOTE_PRI_DOCREP_REP); - logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); - assertAcked( - internalCluster().client() - .admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(REMOTE_PRI_DOCREP_REP, 0, primaryShardHostingNode, remoteNodeName)) - .get() - ); - ensureGreen(REMOTE_PRI_DOCREP_REP); - ClusterState clusterState = internalCluster().client().admin().cluster().prepareState().get().getState(); - String primaryShardHostingNodeId = clusterState.getRoutingTable() - .index(REMOTE_PRI_DOCREP_REP) - .shard(0) - .primaryShard() - .currentNodeId(); - assertTrue(clusterState.getNodes().get(primaryShardHostingNodeId).isRemoteStoreNode()); - - int secondBatch = randomIntBetween(1, 10); - logger.info("---> Indexing another {} docs", secondBatch); - indexBulk(REMOTE_PRI_DOCREP_REP, secondBatch); - // Defensive check to ensure that doc count in replica shard catches up to the primary copy - refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REP); - assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REP, initialBatch, secondBatch); - } - - /* - Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica - - Starts 1 remote backed data node - - Index some docs - - Move primary copy from docrep to remote through _cluster/reroute - - Starts another remote backed data node - - Expands index to 2 replicas. One replica copy lies in remote backed node and other in docrep backed node - - Index some more docs - - Assert primary-replica consistency - */ - public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - - logger.info("---> Starting 1 docrep data nodes"); - String docrepNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); - - logger.info("---> Creating index with 0 replica"); - Settings zeroReplicas = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") - .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build(); - createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, zeroReplicas); - ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); - initDocRepToRemoteMigration(); - - logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; - - String remoteNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals( - internalCluster().client() - .admin() - .cluster() - .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) - .get() - .repositories() - .size(), - 2 - ); - - int firstBatch = randomIntBetween(1, 100); - logger.info("---> Indexing {} docs", firstBatch); - indexBulk(REMOTE_PRI_DOCREP_REMOTE_REP, firstBatch); - - String primaryShardHostingNode = primaryNodeName(REMOTE_PRI_DOCREP_REMOTE_REP); - logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); - assertAcked( - internalCluster().client() - .admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(REMOTE_PRI_DOCREP_REMOTE_REP, 0, primaryShardHostingNode, remoteNodeName)) - .get() - ); - ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); - ClusterState clusterState = internalCluster().client().admin().cluster().prepareState().get().getState(); - String primaryShardHostingNodeId = clusterState.getRoutingTable() - .index(REMOTE_PRI_DOCREP_REMOTE_REP) - .shard(0) - .primaryShard() - .currentNodeId(); - assertTrue(clusterState.getNodes().get(primaryShardHostingNodeId).isRemoteStoreNode()); - - logger.info("---> Starting another remote enabled node"); - internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - - logger.info("---> Expanding index to 2 replica copies"); - Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build(); - assertAcked( - internalCluster().client() - .admin() - .indices() - .prepareUpdateSettings() - .setIndices(REMOTE_PRI_DOCREP_REMOTE_REP) - .setSettings(twoReplicas) - .get() - ); - ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); - - int secondBatch = randomIntBetween(1, 10); - logger.info("---> Indexing another {} docs", secondBatch); - indexBulk(REMOTE_PRI_DOCREP_REMOTE_REP, secondBatch); - // Defensive check to ensure that doc count in replica shard catches up to the primary copy - refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REMOTE_REP); - assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REMOTE_REP, firstBatch, secondBatch); - } - - /* - Checks if retention leases are published on primary shard and it's docrep copies, but not on remote copies - */ - public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Exception { - /* Reducing indices.memory.shard_inactive_time to force a flush and trigger translog sync, - instead of relying on Global CKP Sync action which doesn't run on remote enabled copies - - Under steady state, RetentionLeases would be on (GlobalCkp + 1) on a - docrep enabled shard copy and (GlobalCkp) for a remote enabled shard copy. - This is because we block translog sync on remote enabled shard copies during the GlobalCkpSync background task. - - RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush request issued by IndexingMemoryController - when the shard becomes inactive after SHARD_INACTIVE_TIME_SETTING interval. - - Flush triggers a force sync of translog which bumps the RetentionLease sequence number along with it - */ - extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build(); - testRemotePrimaryDocRepAndRemoteReplica(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - assertBusy(() -> { - for (ShardStats shardStats : internalCluster().client() - .admin() - .indices() - .prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP) - .get() - .getShards()) { - ShardRouting shardRouting = shardStats.getShardRouting(); - DiscoveryNode discoveryNode = nodes.get(shardRouting.currentNodeId()); - RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); - if (shardRouting.primary()) { - // Primary copy should be on remote node and should have retention leases - assertTrue(discoveryNode.isRemoteStoreNode()); - assertCheckpointsConsistency(shardStats); - assertRetentionLeaseConsistency(shardStats, retentionLeases); - } else { - // Checkpoints and Retention Leases are not synced to remote replicas - if (discoveryNode.isRemoteStoreNode()) { - assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().isEmpty()); - } else { - // Replica copy on docrep node should have retention leases - assertCheckpointsConsistency(shardStats); - assertRetentionLeaseConsistency(shardStats, retentionLeases); - } - } - } - }); - } - - /* - Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica - - Starts 1 remote backed data node - - Move primary copy from docrep to remote through _cluster/reroute - - Expands index to 1 replica - - Stops remote enabled node - - Ensure doc count is same after failover - - Index some more docs to ensure working of failed-over primary - */ - public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - - logger.info("---> Starting 1 docrep data nodes"); - String docrepNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); - - logger.info("---> Creating index with 0 replica"); - Settings excludeRemoteNode = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); - createIndex(FAILOVER_REMOTE_TO_DOCREP, excludeRemoteNode); - ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - initDocRepToRemoteMigration(); - logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; - String remoteNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals( - internalCluster().client() - .admin() - .cluster() - .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) - .get() - .repositories() - .size(), - 2 - ); - - logger.info("---> Starting doc ingestion in parallel thread"); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_DOCREP); - asyncIndexingService.startIndexing(); - - String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_DOCREP); - logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); - assertAcked( - internalCluster().client() - .admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_DOCREP, 0, primaryShardHostingNode, remoteNodeName)) - .get() - ); - ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - - logger.info("---> Expanding index to 1 replica copy"); - Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); - assertAcked( - internalCluster().client() - .admin() - .indices() - .prepareUpdateSettings() - .setIndices(FAILOVER_REMOTE_TO_DOCREP) - .setSettings(twoReplicas) - .get() - ); - ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - logger.info("---> Stopping indexing thread"); - asyncIndexingService.stopIndexing(); - - refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP); - Map shardStatsMap = internalCluster().client() - .admin() - .indices() - .prepareStats(FAILOVER_REMOTE_TO_DOCREP) - .setDocs(true) - .get() - .asMap(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - long initialPrimaryDocCount = 0; - for (ShardRouting shardRouting : shardStatsMap.keySet()) { - if (shardRouting.primary()) { - assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); - initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); - } - } - int firstBatch = (int) asyncIndexingService.getIndexedDocs(); - assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_DOCREP, firstBatch, 0); - - logger.info("---> Stop remote store enabled node"); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName)); - ensureStableCluster(2); - ensureYellow(FAILOVER_REMOTE_TO_DOCREP); - - shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); - nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - long primaryDocCountAfterFailover = 0; - for (ShardRouting shardRouting : shardStatsMap.keySet()) { - if (shardRouting.primary()) { - assertFalse(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); - primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); - } - } - assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover); - - logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs"); - int secondBatch = randomIntBetween(1, 10); - logger.info("---> Indexing {} more docs", secondBatch); - indexBulk(FAILOVER_REMOTE_TO_DOCREP, secondBatch); - refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP); - - shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); - assertEquals(1, shardStatsMap.size()); - shardStatsMap.forEach( - (shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); } - ); - } - - /* - Scenario: - - Starts 1 docrep backed data node - - Creates an index with 0 replica - - Starts 1 remote backed data node - - Move primary copy from docrep to remote through _cluster/reroute - - Expands index to 1 replica - - Stops remote enabled node - - Ensure doc count is same after failover - - Index some more docs to ensure working of failed-over primary - - Starts another remote node - - Move primary copy from docrep to remote through _cluster/reroute - - Ensure that remote store is seeded in the new remote node by asserting remote uploads from that node > 0 - */ - public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() throws Exception { - testFailoverRemotePrimaryToDocrepReplica(); - - logger.info("---> Removing replica copy"); - assertAcked( - internalCluster().client() - .admin() - .indices() - .prepareUpdateSettings(FAILOVER_REMOTE_TO_DOCREP) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) - .get() - ); - ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - - logger.info("---> Starting a new remote enabled node"); - addRemote = true; - String remoteNodeName = internalCluster().startDataOnlyNode(); - internalCluster().validateClusterFormed(); - assertEquals( - internalCluster().client() - .admin() - .cluster() - .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) - .get() - .repositories() - .size(), - 2 - ); - - String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_DOCREP); - logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); - assertAcked( - internalCluster().client() - .admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_DOCREP, 0, primaryShardHostingNode, remoteNodeName)) - .get() - ); - ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - - Map shardStatsMap = internalCluster().client() - .admin() - .indices() - .prepareStats(FAILOVER_REMOTE_TO_DOCREP) - .get() - .asMap(); - DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - assertEquals(1, shardStatsMap.size()); - shardStatsMap.forEach((shardRouting, shardStats) -> { - if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) { - RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); - assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); - assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); - } - }); - } - - private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch, int secondBatch) throws Exception { - assertBusy(() -> { - Map shardStatsMap = internalCluster().client() - .admin() - .indices() - .prepareStats(indexName) - .setDocs(true) - .get() - .asMap(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - for (ShardRouting shardRouting : shardStatsMap.keySet()) { - CommonStats shardStats = shardStatsMap.get(shardRouting).getStats(); - if (shardRouting.primary()) { - assertEquals(firstBatch + secondBatch, shardStats.getDocs().getCount()); - assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); - RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); - assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); - assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); - } else { - boolean remoteNode = nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode(); - assertEquals( - "Mismatched doc count. Is this on remote node ? " + remoteNode, - firstBatch + secondBatch, - shardStats.getDocs().getCount() - ); - RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); - if (remoteNode) { - assertTrue(remoteSegmentStats.getDownloadBytesStarted() > 0); - assertTrue(remoteSegmentStats.getTotalDownloadTime() > 0); - } else { - assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(0, remoteSegmentStats.getTotalUploadTime()); - } - } - } - }); - } - - /** - * For a docrep enabled shard copy or a primary shard copy, - * asserts that the stored Retention Leases equals to 1 + maxSeqNo ingested on the node - * - * @param shardStats ShardStats object from NodesStats API - * @param retentionLeases RetentionLeases from NodesStats API - */ - private static void assertRetentionLeaseConsistency(ShardStats shardStats, RetentionLeases retentionLeases) { - long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); - for (RetentionLease rl : retentionLeases.leases()) { - assertEquals(maxSeqNo + 1, rl.retainingSequenceNumber()); - } - } - - /** - * For a docrep enabled shard copy or a primary shard copy, - * asserts that local and global checkpoints are up-to-date with maxSeqNo of doc operations - * - * @param shardStats ShardStats object from NodesStats API - */ - private static void assertCheckpointsConsistency(ShardStats shardStats) { - long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); - long localCkp = shardStats.getSeqNoStats().getLocalCheckpoint(); - long globalCkp = shardStats.getSeqNoStats().getGlobalCheckpoint(); - - assertEquals(maxSeqNo, localCkp); - assertEquals(maxSeqNo, globalCkp); - } -} diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index c1d13128c18b1..a7a13afd2597c 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -442,7 +442,7 @@ protected long primaryOperationSize(BulkShardRequest request) { @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.indexSettings().isRemoteNode()) { + if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.PRIMARY_TERM_VALIDATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index 9f5e31a9c6926..189bc82348a0c 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -9,8 +9,6 @@ package org.opensearch.action.support.replication; import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.core.action.ActionListener; @@ -33,22 +31,14 @@ public class ReplicationModeAwareProxy primaryTermValidationProxy; - private final DiscoveryNodes discoveryNodes; - - private final boolean isRemoteEnabled; - public ReplicationModeAwareProxy( ReplicationMode replicationModeOverride, - DiscoveryNodes discoveryNodes, ReplicationOperation.Replicas replicasProxy, - ReplicationOperation.Replicas primaryTermValidationProxy, - boolean remoteIndexSettingsEnabled + ReplicationOperation.Replicas primaryTermValidationProxy ) { super(replicasProxy); this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride); this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy); - this.discoveryNodes = discoveryNodes; - this.isRemoteEnabled = remoteIndexSettingsEnabled; } @Override @@ -70,26 +60,16 @@ protected void performOnReplicaProxy( @Override ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) { + // If the current routing is the primary, then it does not need to be replicated if (shardRouting.isSameAllocation(primaryRouting)) { return ReplicationMode.NO_REPLICATION; } - // Perform full replication during primary relocation + if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) { return ReplicationMode.FULL_REPLICATION; } - /* - Only applicable during remote store migration. - During the migration process, remote based index settings will not be enabled, - thus we will rely on node attributes to figure out the replication mode - */ - if (isRemoteEnabled == false) { - DiscoveryNode targetNode = discoveryNodes.get(shardRouting.currentNodeId()); - if (targetNode != null && targetNode.isRemoteStoreNode() == false) { - // Perform full replication if replica is hosted on a non-remote node. - return ReplicationMode.FULL_REPLICATION; - } - } + return replicationModeOverride; } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 8d86128e36441..95f998e2d89c2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -356,7 +356,7 @@ public void performOn( * @return the overridden replication mode. */ public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.indexSettings().isRemoteNode()) { + if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.NO_REPLICATION; } return ReplicationMode.FULL_REPLICATION; @@ -642,14 +642,8 @@ public void handleException(TransportException exp) { primaryRequest.getPrimaryTerm(), initialRetryBackoffBound, retryTimeout, - indexShard.indexSettings().isRemoteNode() - ? new ReplicationModeAwareProxy<>( - getReplicationMode(indexShard), - clusterState.getNodes(), - replicasProxy, - termValidationProxy, - indexShard.isRemoteTranslogEnabled() - ) + indexShard.isRemoteTranslogEnabled() + ? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy) : new FanoutReplicationProxy<>(replicasProxy) ).execute(); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index a7b29314210df..9ded1b174d4c6 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -44,7 +44,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; @@ -463,8 +462,7 @@ public synchronized IndexShard createShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final RepositoriesService repositoriesService, final DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode, - DiscoveryNodes discoveryNodes + @Nullable DiscoveryNode sourceNode ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -555,8 +553,7 @@ public synchronized IndexShard createShard( nodeEnv.nodeId(), recoverySettings, remoteStoreSettings, - seedRemote, - discoveryNodes + seedRemote ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 82875564c1c07..f9062585c0093 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1237,7 +1237,11 @@ public boolean isSegRepEnabledOrRemoteNode() { } public boolean isSegRepLocalEnabled() { - return ReplicationType.SEGMENT.equals(replicationType) && !isRemoteStoreEnabled(); + return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled(); + } + + public boolean isSegRepWithRemoteEnabled() { + return isSegRepEnabledOrRemoteNode() && isRemoteStoreEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java index 0c167d6d80b5c..ca1dfe2d5ad01 100644 --- a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java @@ -135,7 +135,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint() - && indexShard.indexSettings().isRemoteNode() == false) { + && indexShard.isRemoteTranslogEnabled() == false) { indexShard.sync(); } } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index b2eb2f03486ac..0e625e9f30320 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -253,8 +253,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private volatile ReplicationCheckpoint latestReplicationCheckpoint; - private final Function isShardOnRemoteEnabledNode; - /** * Get all retention leases tracked on this shard. * @@ -1001,8 +999,7 @@ public ReplicationTracker( final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, final BiConsumer> onSyncRetentionLeases, - final Supplier safeCommitInfoSupplier, - final Function isShardOnRemoteEnabledNode + final Supplier safeCommitInfoSupplier ) { this( shardId, @@ -1014,8 +1011,7 @@ public ReplicationTracker( currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, - x -> {}, - isShardOnRemoteEnabledNode + x -> {} ); } @@ -1041,8 +1037,7 @@ public ReplicationTracker( final LongSupplier currentTimeMillisSupplier, final BiConsumer> onSyncRetentionLeases, final Supplier safeCommitInfoSupplier, - final Consumer onReplicationGroupUpdated, - final Function isShardOnRemoteEnabledNode + final Consumer onReplicationGroupUpdated ) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; @@ -1065,7 +1060,6 @@ public ReplicationTracker( this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null; - this.isShardOnRemoteEnabledNode = isShardOnRemoteEnabledNode; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -1094,12 +1088,8 @@ private ReplicationGroup calculateReplicationGroup() { } else { newVersion = replicationGroup.getVersion() + 1; } - assert indexSettings.isRemoteTranslogStoreEnabled() - // Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node - || (replicationGroup != null - && replicationGroup.getReplicationTargets() - .stream() - .anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))) + + assert indexSettings().isRemoteTranslogStoreEnabled() || checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated) : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION"; @@ -1258,9 +1248,7 @@ private void createReplicationLagTimers() { if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false && isPrimaryRelocation(allocationId) == false - && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) - && (indexSettings.isSegRepLocalEnabled() == true - || isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) { + && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer()); logger.trace( () -> new ParameterizedMessage( @@ -1378,7 +1366,8 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) || indexSettings.isRemoteNode()) { + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) + || indexSettings().isRemoteTranslogStoreEnabled()) { assert primaryShard.allocationId().getId().equals(shardAllocationId) : routingTable.assignedShards() + " vs " + shardAllocationId; @@ -1464,12 +1453,7 @@ public synchronized void updateFromClusterManager( globalCheckpoint, inSync, inSync, - isReplicated( - initializingId, - primaryAllocationId, - primaryTargetAllocationId, - assignedToRemoteStoreNode(routingTable, initializingId) - ) + isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) ) ); } @@ -1488,12 +1472,7 @@ public synchronized void updateFromClusterManager( globalCheckpoint, false, false, - isReplicated( - initializingId, - primaryAllocationId, - primaryTargetAllocationId, - assignedToRemoteStoreNode(routingTable, initializingId) - ) + isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) ) ); } @@ -1507,12 +1486,7 @@ public synchronized void updateFromClusterManager( globalCheckpoint, true, true, - isReplicated( - inSyncId, - primaryAllocationId, - primaryTargetAllocationId, - assignedToRemoteStoreNode(routingTable, inSyncId) - ) + isReplicated(inSyncId, primaryAllocationId, primaryTargetAllocationId) ) ); } @@ -1529,12 +1503,6 @@ public synchronized void updateFromClusterManager( assert invariant(); } - private boolean assignedToRemoteStoreNode(IndexShardRoutingTable routingTable, String allocationId) { - return indexSettings().isRemoteStoreEnabled() - || (routingTable.getByAllocationId(allocationId) != null - && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId())); - } - /** * Returns whether the requests are replicated considering the remote translog existence, current/primary/primary target allocation ids. * @@ -1543,16 +1511,13 @@ private boolean assignedToRemoteStoreNode(IndexShardRoutingTable routingTable, S * @param primaryTargetAllocationId primary target allocation id * @return the replication mode. */ - private boolean isReplicated( - String allocationId, - String primaryAllocationId, - String primaryTargetAllocationId, - boolean assignedToRemoteStoreNode - ) { - // If assigned to a remote node, returns true if given allocation id matches the primary or it's relocation target allocation + private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId) { + // If remote translog is enabled, then returns replication mode checking current allocation id against the // primary and primary target allocation id. - if (assignedToRemoteStoreNode == true) { - return allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId); + // If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation + // id. + if (indexSettings().isRemoteTranslogStoreEnabled()) { + return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId)); } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. return true; diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index b47025d75282c..675d60ec2b63d 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -43,7 +43,7 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() - && shard.indexSettings.isRemoteNode() == false) { + && !shard.indexSettings.isSegRepWithRemoteEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } return true; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 484083a5b1260..2b935b743512a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -67,8 +67,6 @@ import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -359,7 +357,6 @@ Runnable getGlobalCheckpointSyncer() { On source remote node , it will be REMOTE_MIGRATING_UNSEEDED when relocating from docrep node */ private final ShardMigrationState shardMigrationState; - private DiscoveryNodes discoveryNodes; public IndexShard( final ShardRouting shardRouting, @@ -389,8 +386,7 @@ public IndexShard( final String nodeId, final RecoverySettings recoverySettings, final RemoteStoreSettings remoteStoreSettings, - boolean seedRemote, - final DiscoveryNodes discoveryNodes + boolean seedRemote ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -452,8 +448,7 @@ public IndexShard( threadPool::absoluteTimeInMillis, (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), this::getSafeCommitInfo, - pendingReplicationActions, - isShardOnRemoteEnabledNode + pendingReplicationActions ); // the query cache is a node-level thing, however we want the most popular filters @@ -491,7 +486,6 @@ public boolean shouldCache(Query query) { this.remoteStoreSettings = remoteStoreSettings; this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); - this.discoveryNodes = discoveryNodes; } public ThreadPool getThreadPool() { @@ -512,23 +506,6 @@ public boolean shouldSeedRemoteStore() { return shardMigrationState == REMOTE_MIGRATING_UNSEEDED; } - /** - * To be delegated to {@link ReplicationTracker} so that relevant remote store based - * operations can be ignored during engine migration - *

- * Has explicit null checks to ensure that the {@link ReplicationTracker#invariant()} - * checks does not fail during a cluster manager state update when the latest replication group - * calculation is not yet done and the cached replication group details are available - */ - public Function isShardOnRemoteEnabledNode = nodeId -> { - DiscoveryNode node = discoveryNodes.get(nodeId); - if (node != null) { - logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode()); - return node.isRemoteStoreNode(); - } - return false; - }; - public boolean isRemoteSeeded() { return shardMigrationState == REMOTE_MIGRATING_SEEDED; } @@ -639,10 +616,8 @@ public void updateShardState( final BiConsumer> primaryReplicaSyncer, final long applyingClusterStateVersion, final Set inSyncAllocationIds, - final IndexShardRoutingTable routingTable, - DiscoveryNodes discoveryNodes + final IndexShardRoutingTable routingTable ) throws IOException { - this.discoveryNodes = discoveryNodes; final ShardRouting currentRouting; synchronized (mutex) { currentRouting = this.shardRouting; @@ -3520,8 +3495,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * When remote translog is enabled for an index, replication operation is limited to primary term validation and does not * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ - assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) || indexSettings.isRemoteNode() - : "supposedly in-sync shard copy received a global checkpoint [" + assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) + || indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "that is higher than its local checkpoint [" @@ -4018,7 +3993,7 @@ private boolean isRemoteStoreEnabled() { } public boolean isRemoteTranslogEnabled() { - return indexSettings() != null && (indexSettings().isRemoteTranslogStoreEnabled()); + return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4f68c03913199..ba78c28f3db88 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -54,7 +54,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; @@ -1022,8 +1021,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final DiscoveryNodes discoveryNodes + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -1038,8 +1036,7 @@ public IndexShard createShard( remoteStoreStatsTrackerFactory, repositoriesService, targetNode, - sourceNode, - discoveryNodes + sourceNode ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 2c3ffcdd9e0ba..7fb8b172ae352 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -680,8 +680,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteStoreStatsTrackerFactory, - nodes + remoteStoreStatsTrackerFactory ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -715,8 +714,7 @@ private void updateShard( primaryReplicaSyncer::resync, clusterState.version(), inSyncIds, - indexShardRoutingTable, - nodes + indexShardRoutingTable ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -924,8 +922,7 @@ void updateShardState( BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, - IndexShardRoutingTable routingTable, - DiscoveryNodes discoveryNodes + IndexShardRoutingTable routingTable ) throws IOException; } @@ -1043,8 +1040,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - DiscoveryNodes discoveryNodes + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java index 96e85154e6248..0ccb1ac2133cf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -23,7 +23,8 @@ public static RecoverySourceHandler create( StartRecoveryRequest request, RecoverySettings recoverySettings ) { - boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && request.targetNode().isRemoteStoreNode(); + boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false + && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()); if (isReplicaRecoveryWithRemoteTranslog) { return new RemoteStorePeerRecoverySourceHandler( shard, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index f47b082de3856..16311d5d2cfb7 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -213,17 +213,10 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener { state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details state().getTranslog().totalOperations(totalTranslogOps); - // Cleanup remote contents before opening new translog. - // This prevents reading from any old Translog UUIDs during re-seeding - // (situation in which primary fails over to docrep replica and is re-seeded to remote again) - // which might end up causing a TranslogCorruptedException - if (indexShard.shouldSeedRemoteStore()) { - assert indexShard.routingEntry().primary() : "Remote seeding should only true be for primary shard copy"; - indexShard.deleteRemoteStoreContents(); - } indexShard().openEngineAndSkipTranslogRecovery(); // upload to remote store in migration for primary shard - if (indexShard.shouldSeedRemoteStore()) { + if (indexShard.shouldSeedRemoteStore() && indexShard.routingEntry().primary()) { + indexShard.deleteRemoteStoreContents(); // This cleans up remote translog's 0 generation, as we don't want to get that uploaded indexShard.sync(); threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { indexShard.refresh("remote store migration"); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 657705a8cd725..852003c9f3e4d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -38,7 +38,7 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - if (shard.indexSettings().isRemoteNode()) { + if (shard.indexSettings().isSegRepWithRemoteEnabled()) { return new RemoteStoreReplicationSource(shard); } else { return new PrimaryShardReplicationSource( diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 4d7d0a8633b5c..821ae42e31881 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -98,7 +98,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.indexSettings().isRemoteNode()) { + if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); @@ -199,12 +199,6 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh Objects.requireNonNull(replica); ActionListener.completeWith(listener, () -> { logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); - // Condition for ensuring that we ignore Segrep checkpoints received on Docrep shard copies. - // This case will hit iff the replica hosting node is not remote enabled and replication type != SEGMENT - if (replica.indexSettings().isRemoteNode() == false && replica.indexSettings().isSegRepLocalEnabled() == false) { - logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp."); - return new ReplicaResult(); - } if (request.getCheckpoint().getShardId().equals(replica.shardId())) { replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 5ca5f53f180be..ef26bc225b0c7 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -82,7 +82,6 @@ import org.mockito.ArgumentCaptor; import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.arrayWithSize; @@ -333,15 +332,15 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { TransportVerifyShardBeforeCloseAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportVerifyShardBeforeCloseAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } private TransportVerifyShardBeforeCloseAction createAction() { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java index c9d3a6c4c7605..09215088bd04b 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java @@ -20,7 +20,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,14 +28,14 @@ public class TransportShardFlushActionTests extends OpenSearchTestCase { public void testGetReplicationModeWithRemoteTranslog() { TransportShardFlushAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardFlushAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java index 90498d6d35700..8c4a6c023f9a5 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java @@ -20,7 +20,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,14 +28,14 @@ public class TransportVerifyShardIndexBlockActionTests extends OpenSearchTestCas public void testGetReplicationModeWithRemoteTranslog() { TransportVerifyShardIndexBlockAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportVerifyShardIndexBlockAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java index bc0b7e5cf14b2..b2eee904bad38 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java @@ -20,7 +20,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,14 +28,14 @@ public class TransportShardRefreshActionTests extends OpenSearchTestCase { public void testGetReplicationModeWithRemoteTranslog() { TransportShardRefreshAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardRefreshAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 6331861c3dcb9..65b555649b2d0 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -107,7 +107,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; @@ -1238,14 +1237,14 @@ public void testHandlePrimaryTermValidationRequestSuccess() { public void testGetReplicationModeWithRemoteTranslog() { TransportShardBulkAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.PRIMARY_TERM_VALIDATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardBulkAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index a2fefd6278321..da87a0a967f53 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -84,7 +84,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.opensearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; @@ -234,14 +233,14 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { final TransportResyncReplicationAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final TransportResyncReplicationAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java deleted file mode 100644 index 626c2f74f09c4..0000000000000 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.support.replication; - -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.AllocationId; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.TestShardRouting; -import org.opensearch.core.index.Index; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.shard.IndexShardTestUtils; -import org.opensearch.test.OpenSearchTestCase; - -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -public class ReplicationModeAwareProxyTests extends OpenSearchTestCase { - - /* - Replication action running on the same primary copy from which it originates. - Action should not run and proxy should return ReplicationMode.NO_REPLICATION - */ - public void testDetermineReplicationModeTargetRoutingCurrentPrimary() { - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - null, - true, - ShardRoutingState.STARTED, - AllocationId.newInitializing("abc") - ); - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - null, - true, - ShardRoutingState.STARTED, - AllocationId.newInitializing("abc") - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - ReplicationMode.NO_REPLICATION, - DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("dummy-node")).build(), - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - randomBoolean() - ); - assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); - } - - /* - Replication action originating from failing primary to replica being promoted to primary - Action should run and proxy should return ReplicationMode.FULL_REPLICATION - */ - public void testDetermineReplicationModeTargetRoutingRelocatingPrimary() { - AllocationId primaryId = AllocationId.newRelocation(AllocationId.newInitializing()); - AllocationId relocationTargetId = AllocationId.newTargetRelocation(primaryId); - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node-2", - null, - true, - ShardRoutingState.INITIALIZING, - relocationTargetId - ); - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - "dummy-node-2", - true, - ShardRoutingState.RELOCATING, - primaryId - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - ReplicationMode.NO_REPLICATION, - DiscoveryNodes.builder() - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) - .build(), - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - randomBoolean() - ); - assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); - } - - /* - Replication action originating from remote enabled primary to docrep replica during remote store migration - Action should run and proxy should return ReplicationMode.FULL_REPLICATION - */ - public void testDetermineReplicationModeTargetRoutingDocrepShard() { - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - true, - ShardRoutingState.STARTED - ); - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node-2", - false, - ShardRoutingState.STARTED - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - ReplicationMode.NO_REPLICATION, - DiscoveryNodes.builder() - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) - .add(IndexShardTestUtils.getFakeDiscoNode(targetRouting.currentNodeId())) - .build(), - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - false - ); - assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); - } - - /* - Replication action originating from remote enabled primary to remote replica during remote store migration - Action should not run and proxy should return ReplicationMode.NO_REPLICATION - */ - public void testDetermineReplicationModeTargetRoutingRemoteShard() { - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - false, - ShardRoutingState.STARTED - ); - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node-2", - true, - ShardRoutingState.STARTED - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - ReplicationMode.NO_REPLICATION, - DiscoveryNodes.builder() - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) - .build(), - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - false - ); - assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); - } - - /* - Replication action originating from remote enabled primary to remote enabled replica during remote store migration - with an explicit replication mode specified - Action should run and proxy should return the overridden Replication Mode - */ - public void testDetermineReplicationWithExplicitOverrideTargetRoutingRemoteShard() { - ReplicationMode replicationModeOverride = ReplicationMode.PRIMARY_TERM_VALIDATION; - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - false, - ShardRoutingState.STARTED - ); - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node-2", - true, - ShardRoutingState.STARTED - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - replicationModeOverride, - DiscoveryNodes.builder() - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) - .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) - .build(), - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - false - ); - assertEquals(replicationModeOverride, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); - } - - /* - Replication action originating from remote enabled primary with remote enabled index settings enabled - Action should not query the DiscoveryNodes object - */ - public void testDetermineReplicationWithRemoteIndexSettingsEnabled() { - DiscoveryNodes mockDiscoveryNodes = mock(DiscoveryNodes.class); - ShardRouting primaryRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node", - false, - ShardRoutingState.STARTED - ); - ShardRouting targetRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - "dummy-node-2", - true, - ShardRoutingState.STARTED - ); - final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( - ReplicationMode.NO_REPLICATION, - mockDiscoveryNodes, - mock(TransportReplicationAction.ReplicasProxy.class), - mock(TransportReplicationAction.ReplicasProxy.class), - true - ); - replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting); - // Verify no interactions with the DiscoveryNodes object - verify(mockDiscoveryNodes, never()).get(anyString()); - } -} diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index ec5fc1d19e40d..6b54623b03164 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -48,7 +48,6 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; @@ -60,7 +59,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShardNotStartedException; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.node.NodeClosedException; import org.opensearch.test.OpenSearchTestCase; @@ -241,13 +239,7 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception { listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>( - ReplicationMode.NO_REPLICATION, - buildRemoteStoreEnabledDiscoveryNodes(routingTable), - replicasProxy, - replicasProxy, - true - ) + new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -312,13 +304,7 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>( - ReplicationMode.NO_REPLICATION, - buildRemoteStoreEnabledDiscoveryNodes(routingTable), - replicasProxy, - replicasProxy, - true - ) + new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -394,144 +380,6 @@ public void testForceReplicationWithRemoteTranslogEnabled() throws Exception { assertEquals(activeIds.size() + initializingIds.size(), shardInfo.getTotal()); } - public void testReplicationInDualModeWithDocrepReplica() throws Exception { - Set initializingIds = new HashSet<>(); - IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> initializingIds.add(AllocationId.newInitializing())); - Set activeIds = new HashSet<>(); - IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> activeIds.add(AllocationId.newInitializing())); - - AllocationId primaryId = activeIds.iterator().next(); - - ShardId shardId = new ShardId("test", "_na_", 0); - IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); - final ShardRouting primaryShard = newShardRouting( - shardId, - nodeIdFromAllocationId(primaryId), - null, - true, - ShardRoutingState.STARTED, - primaryId - ); - initializingIds.forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId); - builder.addShard(routing); - }); - activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); - builder.addShard(routing); - }); - builder.addShard(primaryShard); - IndexShardRoutingTable routingTable = builder.build(); - - Set inSyncAllocationIds = activeIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); - ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0); - List replicationTargets = replicationGroup.getReplicationTargets(); - assertEquals(inSyncAllocationIds.size(), replicationTargets.size()); - assertTrue( - replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds) - ); - - Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); - Map simulatedFailures = new HashMap<>(); - TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures); - TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); - final TestReplicationOperation op = new TestReplicationOperation( - request, - primary, - listener, - replicasProxy, - 0, - new ReplicationModeAwareProxy<>( - ReplicationMode.NO_REPLICATION, - buildDiscoveryNodes(routingTable), - replicasProxy, - replicasProxy, - false - ) - ); - op.execute(); - assertTrue("request was not processed on primary", request.processedOnPrimary.get()); - // During dual replication, except for primary, replication action should be executed on all the replicas - assertEquals(activeIds.size() - 1, request.processedOnReplicas.size()); - assertEquals(0, replicasProxy.failedReplicas.size()); - assertEquals(0, replicasProxy.markedAsStaleCopies.size()); - assertTrue("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get()); - assertTrue("listener is not marked as done", listener.isDone()); - - ShardInfo shardInfo = listener.actionGet().getShardInfo(); - // All initializing and active shards are set to docrep - assertEquals(initializingIds.size() + activeIds.size(), shardInfo.getTotal()); - } - - public void testReplicationInDualModeWithMixedReplicasSomeInDocrepOthersOnRemote() throws Exception { - Set initializingIds = new HashSet<>(); - IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> initializingIds.add(AllocationId.newInitializing())); - Set activeIds = new HashSet<>(); - IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> activeIds.add(AllocationId.newInitializing())); - - AllocationId primaryId = activeIds.iterator().next(); - - ShardId shardId = new ShardId("test", "_na_", 0); - IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); - final ShardRouting primaryShard = newShardRouting( - shardId, - nodeIdFromAllocationId(primaryId), - null, - true, - ShardRoutingState.STARTED, - primaryId - ); - initializingIds.forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId); - builder.addShard(routing); - }); - activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); - builder.addShard(routing); - }); - builder.addShard(primaryShard); - IndexShardRoutingTable routingTable = builder.build(); - - Set inSyncAllocationIds = activeIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); - ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0); - List replicationTargets = replicationGroup.getReplicationTargets(); - assertEquals(inSyncAllocationIds.size(), replicationTargets.size()); - assertTrue( - replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds) - ); - - Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); - Map simulatedFailures = new HashMap<>(); - TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures); - TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); - // Generating data nodes in mixed mode wherein some of the allocated replicas - // are in docrep nodes whereas others are on remote enabled ones - Tuple discoveryNodesDetails = buildMixedModeDiscoveryNodes(routingTable); - int docRepNodes = discoveryNodesDetails.v1(); - final TestReplicationOperation op = new TestReplicationOperation( - request, - primary, - listener, - replicasProxy, - 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, discoveryNodesDetails.v2(), replicasProxy, replicasProxy, false) - ); - op.execute(); - assertTrue("request was not processed on primary", request.processedOnPrimary.get()); - // Only docrep nodes should have the request fanned out to - assertEquals(docRepNodes, request.processedOnReplicas.size()); - assertEquals(0, replicasProxy.failedReplicas.size()); - assertEquals(0, replicasProxy.markedAsStaleCopies.size()); - assertTrue("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get()); - assertTrue("listener is not marked as done", listener.isDone()); - - ShardInfo shardInfo = listener.actionGet().getShardInfo(); - // Listener should be invoked for initializing Ids, primary and the operations on docrep nodes - assertEquals(1 + docRepNodes + initializingIds.size(), shardInfo.getTotal()); - } - static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } @@ -968,46 +816,6 @@ private Set getExpectedReplicas(ShardId shardId, ClusterState stat return expectedReplicas; } - private DiscoveryNodes buildRemoteStoreEnabledDiscoveryNodes(IndexShardRoutingTable routingTable) { - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (ShardRouting shardRouting : routingTable) { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); - } - return builder.build(); - } - - private DiscoveryNodes buildDiscoveryNodes(IndexShardRoutingTable routingTable) { - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (ShardRouting shardRouting : routingTable) { - if (shardRouting.primary()) { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); - } else { - builder.add(IndexShardTestUtils.getFakeDiscoNode(shardRouting.currentNodeId())); - } - } - return builder.build(); - } - - private Tuple buildMixedModeDiscoveryNodes(IndexShardRoutingTable routingTable) { - int docrepNodes = 0; - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (ShardRouting shardRouting : routingTable) { - if (shardRouting.primary()) { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); - } else { - // Only add docrep nodes for allocationIds that are active - // since the test cases creates replication group with active allocationIds only - if (shardRouting.active() && randomBoolean()) { - builder.add(IndexShardTestUtils.getFakeDiscoNode(shardRouting.currentNodeId())); - docrepNodes += 1; - } else { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); - } - } - } - return new Tuple<>(docrepNodes, builder.build()); - } - public static class Request extends ReplicationRequest { public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean(); diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java index 4a18778cc0b2b..dad0fa0efd3ec 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java @@ -78,7 +78,6 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; -import org.opensearch.index.remote.RemoteStoreTestsHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexShardState; @@ -1590,15 +1589,9 @@ private IndexService mockIndexService(final IndexMetadata indexMetadata, Cluster @SuppressWarnings("unchecked") private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { - return mockIndexShard(shardId, clusterService, false); - } - - @SuppressWarnings("unchecked") - private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService, boolean remote) { final IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(shardId); when(indexShard.state()).thenReturn(IndexShardState.STARTED); - when(indexShard.indexSettings()).thenReturn(RemoteStoreTestsHelper.createIndexSettings(remote)); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; if (isPrimaryMode.get()) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java index 043b4493e8989..e072d3037caad 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java @@ -10,7 +10,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; -import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; @@ -38,22 +37,4 @@ static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) when(indexShard.store()).thenReturn(store); return indexShard; } - - public static IndexSettings createIndexSettings(boolean remote) { - return createIndexSettings(remote, Settings.EMPTY); - } - - public static IndexSettings createIndexSettings(boolean remote, Settings settings) { - IndexSettings indexSettings; - if (remote) { - Settings nodeSettings = Settings.builder() - .put("node.name", "xyz") - .put("node.attr.remote_store.translog.repository", "seg_repo") - .build(); - indexSettings = IndexSettingsModule.newIndexSettings(new Index("test_index", "_na_"), settings, nodeSettings); - } else { - indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); - } - return indexSettings; - } } diff --git a/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java index 904c9a70e61e0..8c59e92a3fe8a 100644 --- a/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java @@ -45,7 +45,6 @@ import org.opensearch.index.seqno.RetentionLeaseUtils; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.test.VersionUtils; import java.util.ArrayList; @@ -183,8 +182,7 @@ public void testTurnOffTranslogRetentionAfterAllShardStarted() throws Exception null, 1L, group.getPrimary().getReplicationGroup().getInSyncAllocationIds(), - group.getPrimary().getReplicationGroup().getRoutingTable(), - IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) + group.getPrimary().getReplicationGroup().getRoutingTable() ); } group.syncGlobalCheckpoint(); diff --git a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java index a27f3476888eb..8363ea3757a2b 100644 --- a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -54,7 +54,6 @@ import java.util.Collections; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -111,7 +110,6 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); final Translog.Durability durability = randomFrom(Translog.Durability.ASYNC, Translog.Durability.REQUEST); when(indexShard.getTranslogDurability()).thenReturn(durability); @@ -160,14 +158,14 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { final GlobalCheckpointSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final GlobalCheckpointSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } @@ -193,7 +191,6 @@ public void testMayBeSyncTranslogWithRemoteTranslog() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(globalCheckpoint - 1); when(indexShard.getTranslogDurability()).thenReturn(Translog.Durability.REQUEST); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); verify(indexShard, never()).sync(); @@ -208,7 +205,6 @@ public void testMayBeSyncTranslogWithLocalTranslog() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(globalCheckpoint - 1); when(indexShard.getTranslogDurability()).thenReturn(Translog.Durability.REQUEST); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); verify(indexShard).sync(); diff --git a/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java index 7a9f1d7baa12e..ca80c7b9c4884 100644 --- a/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -93,8 +93,7 @@ public void setUpReplicationTracker() throws InterruptedException { value -> {}, currentTimeMillis::get, (leases, listener) -> {}, - () -> safeCommitInfo, - sId -> false + () -> safeCommitInfo ); replicationTracker.updateFromClusterManager( 1L, diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index fdbe89422a2aa..3cd60ac973709 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -84,8 +84,7 @@ public void testAddOrRenewRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -133,8 +132,7 @@ public void testAddDuplicateRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -166,8 +164,7 @@ public void testRenewNotFoundRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -205,8 +202,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { equalTo(retainingSequenceNumbers) ); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); reference.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -245,8 +241,7 @@ public void testRemoveRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -310,8 +305,7 @@ public void testCloneRetentionLease() { assertTrue(synced.compareAndSet(false, true)); listener.onResponse(new ReplicationResponse()); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTrackerRef.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -357,8 +351,7 @@ public void testCloneNonexistentRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -387,8 +380,7 @@ public void testCloneDuplicateRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -421,8 +413,7 @@ public void testRemoveNotFound() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -460,8 +451,7 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { equalTo(retainingSequenceNumbers) ); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); reference.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -514,8 +504,7 @@ private void runExpirationTest(final boolean primaryMode) { value -> {}, currentTimeMillis::get, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -594,8 +583,7 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -648,8 +636,7 @@ public void testLoadAndPersistRetentionLeases() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -684,8 +671,7 @@ public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -738,8 +724,7 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -803,8 +788,7 @@ public void testRenewLeaseWithLowerRetainingSequenceNumber() throws Exception { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java index daeefeff59c94..e61d27695a5e5 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java @@ -40,13 +40,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.SafeCommitInfo; -import org.opensearch.index.remote.RemoteStoreTestsHelper; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import java.util.Collections; import java.util.Set; -import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -59,20 +57,18 @@ ReplicationTracker newTracker( final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint, final LongSupplier currentTimeMillisSupplier, - final Settings settings, - final boolean remote + final Settings settings ) { return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), - remote ? RemoteStoreTestsHelper.createIndexSettings(true, settings) : IndexSettingsModule.newIndexSettings("test", settings), + IndexSettingsModule.newIndexSettings("test", settings), randomNonNegativeLong(), UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - remote ? REMOTE_DISCOVERY_NODE : NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); } @@ -84,21 +80,8 @@ ReplicationTracker newTracker( return newTracker(allocationId, updatedGlobalCheckpoint, currentTimeMillisSupplier, Settings.EMPTY); } - ReplicationTracker newTracker( - final AllocationId allocationId, - final LongConsumer updatedGlobalCheckpoint, - final LongSupplier currentTimeMillisSupplier, - final Settings settings - ) { - return newTracker(allocationId, updatedGlobalCheckpoint, currentTimeMillisSupplier, settings, false); - } - static final Supplier OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY; - static final Function NON_REMOTE_DISCOVERY_NODE = shardId -> false; - - static final Function REMOTE_DISCOVERY_NODE = shardId -> true; - static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 233a99cbe4a73..7971591e82bab 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -446,10 +446,6 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); - private ReplicationTracker newTracker(final AllocationId allocationId, Settings settings, boolean remote) { - return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L, settings, remote); - } - private ReplicationTracker newTracker(final AllocationId allocationId, Settings settings) { return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L, settings); } @@ -763,8 +759,7 @@ public void testPrimaryContextHandoff() throws IOException { onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); ReplicationTracker newPrimary = new ReplicationTracker( shardId, @@ -775,8 +770,7 @@ public void testPrimaryContextHandoff() throws IOException { onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - NON_REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); @@ -1306,7 +1300,7 @@ public void testGlobalCheckpointUpdateWithRemoteTranslogEnabled() { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); long primaryLocalCheckpoint = activeWithCheckpoints.get(primaryId); @@ -1384,7 +1378,7 @@ public void testUpdateFromClusterManagerWithRemoteTranslogEnabled() { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); long primaryLocalCheckpoint = activeWithCheckpoints.get(primaryId); @@ -1482,7 +1476,7 @@ public void testMarkAllocationIdAsInSyncWithRemoteTranslogEnabled() throws Excep .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); tracker.activatePrimaryMode(localCheckpoint); @@ -1510,7 +1504,7 @@ public void testMissingActiveIdsDoesNotPreventAdvanceWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); tracker.updateFromClusterManager(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); List initializingRandomSubset = randomSubsetOf(initializing.keySet()); @@ -1543,7 +1537,7 @@ public void testMissingInSyncIdsDoesNotPreventAdvanceWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); tracker.updateFromClusterManager(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach( @@ -1612,8 +1606,8 @@ public void testInSyncIdsAreRemovedIfNotValidatedByClusterManagerWithRemoteTrans .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); - tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId)); + final ReplicationTracker tracker = newTracker(primaryId, settings); + tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); if (randomBoolean()) { initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); @@ -1661,7 +1655,7 @@ public void testUpdateAllocationIdsFromClusterManagerWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings, true); + final ReplicationTracker tracker = newTracker(primaryId, settings); tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); tracker.activatePrimaryMode(NO_OPS_PERFORMED); assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); @@ -2086,8 +2080,7 @@ public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOExcept onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); ReplicationTracker newPrimary = new ReplicationTracker( shardId, @@ -2098,8 +2091,7 @@ public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOExcept onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE, - REMOTE_DISCOVERY_NODE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE ); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index d5d7163b66698..ed04d9a20f18e 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -60,7 +60,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -216,14 +215,14 @@ public void testBlocks() { public void testGetReplicationModeWithRemoteTranslog() { final RetentionLeaseBackgroundSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final RetentionLeaseBackgroundSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java index 7610b8bc39296..63a9ac2f2e8ec 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -60,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -216,15 +215,15 @@ public void testBlocks() { public void testGetReplicationModeWithRemoteTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } private RetentionLeaseSyncAction createAction() { diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index e5bfa8caee79a..537bfcf8f8a6b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -697,8 +697,7 @@ public void testPrimaryPromotionRollsGeneration() throws Exception { (shard, listener) -> {}, 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), - IndexShardTestUtils.getFakeDiscoveryNodes(primaryRouting) + new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build() ); /* @@ -765,8 +764,7 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), - new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), - IndexShardTestUtils.getFakeDiscoveryNodes(primaryRouting) + new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build() ); latch.await(); assertThat(indexShard.getActiveOperationsCount(), is(oneOf(0, IndexShard.OPERATIONS_BLOCKED))); @@ -1448,8 +1446,7 @@ public void onFailure(Exception e) { (s, r) -> resyncLatch.countDown(), 1L, Collections.singleton(newRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build(), - IndexShardTestUtils.getFakeDiscoveryNodes(newRouting) + new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build() ); resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); @@ -3287,7 +3284,7 @@ public void testRecoverFromTranslog() throws IOException { Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations); primary.markAsRecovering( "store", - new RecoveryState(primary.routingEntry(), IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) + new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) ); recoverFromStore(primary); @@ -4032,19 +4029,15 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { if (isPrimary) { newShard.markAsRecovering( "store", - new RecoveryState( - newShard.routingEntry(), - IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()), - null - ) + new RecoveryState(newShard.routingEntry(), getFakeDiscoNode(newShard.routingEntry().currentNodeId()), null) ); } else { newShard.markAsRecovering( "peer", new RecoveryState( newShard.routingEntry(), - IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()), - IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()) + getFakeDiscoNode(newShard.routingEntry().currentNodeId()), + getFakeDiscoNode(newShard.routingEntry().currentNodeId()) ) ); } diff --git a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java index 09903a8b44cb5..b1bcaac2c1947 100644 --- a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java @@ -111,8 +111,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { null, 1000L, Collections.singleton(allocationId), - new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), - IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build() ); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getLastKnownGlobalCheckpoint()); @@ -191,8 +190,7 @@ public void testSyncerOnClosingShard() throws Exception { null, 1000L, Collections.singleton(allocationId), - new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), - IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build() ); CountDownLatch syncCalledLatch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0428bdf0655b0..0e16e81b1bb70 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -34,7 +34,6 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; @@ -165,8 +164,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem null, null, localNode, - null, - DiscoveryNodes.builder().add(localNode).build() + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0490228a5cc16..c455101ff4549 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -35,7 +35,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -265,8 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final DiscoveryNodes discoveryNodes + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); @@ -389,8 +387,7 @@ public void updateShardState( BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, - IndexShardRoutingTable routingTable, - DiscoveryNodes discoveryNodes + IndexShardRoutingTable routingTable ) throws IOException { failRandomly(); assertThat(this.shardId(), equalTo(shardRouting.shardId())); diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java index ded174fb98eef..4fbae4b0d53ca 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -38,7 +38,6 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.test.NodeRoles; @@ -66,8 +65,8 @@ public void testDuplicateRecoveries() throws IOException { StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest( primary.shardId(), randomAlphaOfLength(10), - IndexShardTestUtils.getFakeDiscoNode("source"), - IndexShardTestUtils.getFakeDiscoNode("target"), + getFakeDiscoNode("source"), + getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index a8e5a02011538..1e6cc43703672 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -56,7 +56,6 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; @@ -93,8 +92,8 @@ public void testWriteFileChunksConcurrently() throws Exception { mdFiles.add(md); } final IndexShard targetShard = newShard(false); - final DiscoveryNode pNode = IndexShardTestUtils.getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); - final DiscoveryNode rNode = IndexShardTestUtils.getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); + final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); + final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, threadPool); final PlainActionFuture receiveFileInfoFuture = new PlainActionFuture<>(); diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 352f827c74cb2..2cf006176022d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -15,7 +15,6 @@ import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -36,12 +35,9 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -132,9 +128,7 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - when(indexShard.indexSettings()).thenReturn( - createIndexSettings(false, Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT").build()) - ); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( @@ -166,46 +160,17 @@ public void testPublishCheckpointActionOnReplica() { } - public void testPublishCheckpointActionOnDocrepReplicaDuringMigration() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); - - final PublishCheckpointAction action = new PublishCheckpointAction( - Settings.EMPTY, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - new ActionFilters(Collections.emptySet()), - mockTargetService - ); - // no interaction with SegmentReplicationTargetService object - verify(mockTargetService, never()).onNewCheckpoint(any(), any()); - } - public void testGetReplicationModeWithRemoteTranslog() { final PublishCheckpointAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final PublishCheckpointAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 1cb5501810c5d..43289a7c89524 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -898,8 +898,7 @@ public EngineConfig config( update -> {}, () -> 0L, (leases, listener) -> listener.onResponse(new ReplicationResponse()), - () -> SafeCommitInfo.EMPTY, - sId -> false + () -> SafeCommitInfo.EMPTY ); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 3226035bba97b..e6e20ce8f8566 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -64,7 +64,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; @@ -97,7 +96,6 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; @@ -342,23 +340,6 @@ public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } - public synchronized DiscoveryNodes generateFakeDiscoveryNodes() { - DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(); - if (primary.indexSettings() != null && primary.indexSettings().isRemoteNode()) { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(primary.routingEntry().currentNodeId())); - } else { - builder.add(IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId())); - } - for (IndexShard replica : replicas) { - if (replica.indexSettings() != null && replica.indexSettings().isRemoteNode()) { - builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(replica.routingEntry().currentNodeId())); - } else { - builder.add(IndexShardTestUtils.getFakeDiscoNode(replica.routingEntry().currentNodeId())); - } - } - return builder.build(); - } - public synchronized int startReplicas(int numOfReplicasToStart) throws IOException { if (primary.routingEntry().initializing()) { startPrimary(); @@ -390,8 +371,7 @@ public void startPrimary() throws IOException { null, currentClusterStateVersion.incrementAndGet(), activeIds, - routingTable, - generateFakeDiscoveryNodes() + routingTable ); for (final IndexShard replica : replicas) { recoverReplica(replica); @@ -512,8 +492,7 @@ public synchronized void promoteReplicaToPrimary( primaryReplicaSyncer, currentClusterStateVersion.incrementAndGet(), activeIds(), - routingTable, - generateFakeDiscoveryNodes() + routingTable ); } @@ -659,16 +638,14 @@ public void syncGlobalCheckpoint() { } private void updateAllocationIDsOnPrimary() throws IOException { + primary.updateShardState( primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), activeIds(), - routingTable(Function.identity()), - primary.indexSettings().isRemoteTranslogStoreEnabled() - ? IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable(Function.identity()).getShards()) - : IndexShardTestUtils.getFakeDiscoveryNodes(routingTable(Function.identity()).getShards()) + routingTable(Function.identity()) ); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b2ece9c813802..80d16a8243634 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -48,7 +48,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -620,14 +619,12 @@ protected IndexShard newShard( IndexingOperationListener... listeners ) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); - DiscoveryNodes discoveryNodes = IndexShardTestUtils.getFakeDiscoveryNodes(routing); // To simulate that the node is remote backed if (indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) == "true") { nodeSettings = Settings.builder() .put("node.name", routing.currentNodeId()) .put("node.attr.remote_store.translog.repository", "seg_repo") .build(); - discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode(routing.currentNodeId())).build(); } final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings); final IndexShard indexShard; @@ -715,8 +712,7 @@ protected IndexShard newShard( "dummy-node", DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, - false, - discoveryNodes + false ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -990,7 +986,7 @@ protected void closeShards(Iterable shards) throws IOException { protected void recoverShardFromStore(IndexShard primary) throws IOException { primary.markAsRecovering( "store", - new RecoveryState(primary.routingEntry(), IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) + new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) ); recoverFromStore(primary); updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry())); @@ -1007,19 +1003,7 @@ public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRoutin null, currentClusterStateVersion.incrementAndGet(), inSyncIds, - newRoutingTable, - DiscoveryNodes.builder() - .add( - new DiscoveryNode( - shardRouting.currentNodeId(), - shardRouting.currentNodeId(), - buildNewFakeTransportAddress(), - Collections.emptyMap(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ) - ) - .build() + newRoutingTable ); } @@ -1033,6 +1017,17 @@ protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) th } } + protected DiscoveryNode getFakeDiscoNode(String id) { + return new DiscoveryNode( + id, + id, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + } + protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, startReplica, getReplicationFunc(replica)); } @@ -1109,7 +1104,7 @@ protected void recoverReplica( * @param targetSupplier supplies an instance of {@link RecoveryTarget} * @param markAsRecovering set to {@code false} if the replica is marked as recovering */ - public final void recoverUnstartedReplica( + protected final void recoverUnstartedReplica( final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, @@ -1118,18 +1113,8 @@ public final void recoverUnstartedReplica( final IndexShardRoutingTable routingTable, final Function, List> replicatePrimaryFunction ) throws IOException { - final DiscoveryNode pNode; - final DiscoveryNode rNode; - if (primary.isRemoteTranslogEnabled()) { - pNode = IndexShardTestUtils.getFakeRemoteEnabledNode(primary.routingEntry().currentNodeId()); - } else { - pNode = IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()); - } - if (replica.isRemoteTranslogEnabled()) { - rNode = IndexShardTestUtils.getFakeRemoteEnabledNode(replica.routingEntry().currentNodeId()); - } else { - rNode = IndexShardTestUtils.getFakeDiscoNode(replica.routingEntry().currentNodeId()); - } + final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); + final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); if (markAsRecovering) { replica.markAsRecovering("remote", new RecoveryState(replica.routingEntry(), pNode, rNode)); } else { @@ -1170,10 +1155,7 @@ public final void recoverUnstartedReplica( null, currentClusterStateVersion.incrementAndGet(), inSyncIds, - routingTable, - primary.isRemoteTranslogEnabled() - ? IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.getShards()) - : IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.getShards()) + routingTable ); try { PlainActionFuture future = new PlainActionFuture<>(); @@ -1207,10 +1189,7 @@ protected void startReplicaAfterRecovery( null, currentClusterStateVersion.incrementAndGet(), inSyncIdsWithReplica, - newRoutingTable, - primary.indexSettings.isRemoteTranslogStoreEnabled() - ? IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.shards()) - : IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.shards()) + newRoutingTable ); replica.updateShardState( replica.routingEntry().moveToStarted(), @@ -1218,10 +1197,7 @@ protected void startReplicaAfterRecovery( null, currentClusterStateVersion.get(), inSyncIdsWithReplica, - newRoutingTable, - replica.indexSettings.isRemoteTranslogStoreEnabled() - ? IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.shards()) - : IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.shards()) + newRoutingTable ); } @@ -1250,8 +1226,7 @@ protected void promoteReplica(IndexShard replica, Set inSyncIds, IndexSh ), currentClusterStateVersion.incrementAndGet(), inSyncIds, - newRoutingTable, - IndexShardTestUtils.getFakeDiscoveryNodes(routingEntry) + newRoutingTable ); } @@ -1395,7 +1370,7 @@ protected void recoverShardFromSnapshot(final IndexShard shard, final Snapshot s final Version version = Version.CURRENT; final ShardId shardId = shard.shardId(); final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID()); - final DiscoveryNode node = IndexShardTestUtils.getFakeDiscoNode(shard.routingEntry().currentNodeId()); + final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId()); final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), snapshot, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java deleted file mode 100644 index d3a4a95c3bdef..0000000000000 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.shard; - -import org.opensearch.Version; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class IndexShardTestUtils { - public static DiscoveryNode getFakeDiscoNode(String id) { - return new DiscoveryNode( - id, - id, - IndexShardTestCase.buildNewFakeTransportAddress(), - Collections.emptyMap(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - } - - public static DiscoveryNode getFakeRemoteEnabledNode(String id) { - Map remoteNodeAttributes = new HashMap(); - remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test-repo"); - return new DiscoveryNode( - id, - id, - IndexShardTestCase.buildNewFakeTransportAddress(), - remoteNodeAttributes, - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - } - - public static DiscoveryNodes getFakeDiscoveryNodes(List shardRoutings) { - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (ShardRouting routing : shardRoutings) { - builder.add(getFakeDiscoNode(routing.currentNodeId())); - } - return builder.build(); - } - - public static DiscoveryNodes getFakeRemoteEnabledDiscoveryNodes(List shardRoutings) { - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (ShardRouting routing : shardRoutings) { - builder.add(getFakeRemoteEnabledNode(routing.currentNodeId())); - } - return builder.build(); - } - - public static DiscoveryNodes getFakeDiscoveryNodes(ShardRouting shardRouting) { - return DiscoveryNodes.builder().add(getFakeDiscoNode(shardRouting.currentNodeId())).build(); - } -}