diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java index d835dfa56b693..81af99997241c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java @@ -119,7 +119,7 @@ public void indexData() throws Exception { index("foo", "bar", "1", XContentFactory.jsonBuilder().startObject().field("foo", "foo").endObject()); index("fuu", "buu", "1", XContentFactory.jsonBuilder().startObject().field("fuu", "fuu").endObject()); index("baz", "baz", "1", XContentFactory.jsonBuilder().startObject().field("baz", "baz").endObject()); - refresh(); + waitForReplication(true); } public void testRoutingTable() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index 805b3f0a40a03..197eb101eebf3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -97,7 +97,7 @@ public void testWritesRejected() throws Exception { indexDoc(); totalDocs.incrementAndGet(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -109,7 +109,7 @@ public void testWritesRejected() throws Exception { expectThrows(OpenSearchRejectedExecutionException.class, () -> { indexDoc(); totalDocs.incrementAndGet(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); }); // Verify the rejected doc count. @@ -123,13 +123,13 @@ public void testWritesRejected() throws Exception { assertEquals(perGroupStats.getRejectedRequestCount(), 2L); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // wait for the replicas to catch up after block is released. assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. indexDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true, INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); } @@ -158,7 +158,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { latch.await(); indexDoc(); totalDocs.incrementAndGet(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -177,13 +177,13 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(totalDocs.get(), replicaNodes); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // wait for the replicas to catch up after block is released. assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. indexDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true, INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); } @@ -211,11 +211,11 @@ public void testBelowReplicaLimit() throws Exception { latch.await(); indexDoc(); totalDocs.incrementAndGet(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } // index another doc showing there is no pressure enforced. indexDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true, INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); } @@ -245,7 +245,7 @@ public void testFailStaleReplica() throws Exception { latch.await(); // index again while we are stale. indexDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); totalDocs.incrementAndGet(); // Verify that replica shard is closed. @@ -280,7 +280,7 @@ public void testWithDocumentReplicationEnabledIndex() throws Exception { totalDocs.getAndSet(indexUntilCheckpointCount()); // index again after stale limit. indexDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); totalDocs.incrementAndGet(); // verify total doc count is same and docs are not rejected. assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get()); @@ -309,7 +309,7 @@ public void testBulkWritesRejected() throws Exception { Thread indexingThread = new Thread(() -> { for (int i = 0; i < MAX_CHECKPOINTS_BEHIND + 1; i++) { executeBulkRequest(nodes, docsPerBatch); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } }); indexingThread.start(); @@ -318,12 +318,13 @@ public void testBulkWritesRejected() throws Exception { // try and index again while we are stale. assertBusy(() -> { assertFailedRequests(executeBulkRequest(nodes, randomIntBetween(1, 200))); }); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // wait for the replicas to catch up after block is released. assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. executeBulkRequest(nodes, totalDocs); + waitForReplication(false); waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {})); } @@ -335,7 +336,7 @@ private BulkResponse executeBulkRequest(List nodes, int docsPerBatch) { bulkRequest.add(request); } final BulkResponse bulkItemResponses = client(randomFrom(nodes)).bulk(bulkRequest).actionGet(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); return bulkItemResponses; } @@ -351,7 +352,7 @@ private int indexUntilCheckpointCount() { indexDoc(); } total += numDocs; - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } return total; } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index f8501b4607ce3..37f6d2e02e1ec 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -190,7 +190,7 @@ public void testQueryRewrite() throws Exception { // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); ensureSearchable("index"); assertCacheState(client, "index", 0, 0); @@ -260,7 +260,7 @@ public void testQueryRewriteMissingValues() throws Exception { // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); ensureSearchable("index"); assertCacheState(client, "index", 0, 0); @@ -326,7 +326,7 @@ public void testQueryRewriteDates() throws Exception { // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); ensureSearchable("index"); assertCacheState(client, "index", 0, 0); @@ -399,7 +399,7 @@ public void testQueryRewriteDatesWithNow() throws Exception { .setFlush(true) .get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); ensureSearchable("index-1", "index-2", "index-3"); assertCacheState(client, "index-1", 0, 0); @@ -470,7 +470,7 @@ public void testCanCache() throws Exception { // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); ensureSearchable("index"); assertCacheState(client, "index", 0, 0); @@ -564,7 +564,7 @@ public void testCacheWithFilteredAlias() throws InterruptedException { // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refresh(); + waitForReplication(true); indexRandomForConcurrentSearch("index"); @@ -671,7 +671,7 @@ public void testCacheWithInvalidation() throws Exception { assertCacheState(client, "index", 1, 1); // Explicit refresh would invalidate cache - refresh(); + waitForReplication(true); // Hit same query again resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java index 0f3a5e9b9eb33..66b26b5d25cfe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java @@ -93,7 +93,7 @@ public void testSendCorruptBytesToReplica() throws Exception { } final long originalRecoveryTime = getRecoveryStopTime(replicaNode); assertNotEquals(originalRecoveryTime, 0); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); latch.await(); assertTrue(failed.get()); waitForNewPeerRecovery(replicaNode, originalRecoveryTime); @@ -124,7 +124,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception { .setSource(jsonBuilder().startObject().field("field", i).endObject()) .get(); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); ensureGreen(INDEX_NAME); final long originalRecoveryTime = getRecoveryStopTime(replicaNode); @@ -138,7 +138,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception { .setSource(jsonBuilder().startObject().field("field", i).endObject()) .get(); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForNewPeerRecovery(replicaNode, originalRecoveryTime); resetCheckIndexStatus(); waitForSearchableDocs(20, primaryNode, replicaNode); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 1dd1635ce9a7b..56de62c565d2b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -138,7 +138,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(1, primary, replica); @@ -164,7 +164,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { String nodeC = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true); waitForSearchableDocs(4, nodeC, replica); } @@ -179,7 +179,7 @@ public void testRestartPrimary() throws Exception { final int initialDocCount = 1; client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, replica, primary); @@ -189,6 +189,7 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); + waitForReplication(false); waitForSearchableDocs(initialDocCount, replica, primary); } @@ -203,7 +204,7 @@ public void testCancelPrimaryAllocation() throws Exception { final int initialDocCount = 1; client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, replica, primary); @@ -219,6 +220,7 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); + waitForReplication(false); waitForSearchableDocs(initialDocCount, replica, primary); } @@ -250,7 +252,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 10); @@ -259,8 +261,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); + waitForReplication(false); waitForSearchableDocs(expectedHitCount, nodeA, nodeB); - ensureGreen(INDEX_NAME); } } @@ -296,7 +298,7 @@ public void testIndexReopenClose() throws Exception { ensureGreen(INDEX_NAME); waitForSearchableDocs(initialDocCount, primary, replica); - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); } public void testScrollWithConcurrentIndexAndSearch() throws Exception { @@ -343,10 +345,11 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception { client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); assertBusy(() -> { - refreshWithNoWaitForReplicas(); + client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); + waitForReplication(false); waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica)); } @@ -390,7 +393,7 @@ public void testMultipleShards() throws Exception { waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); } } @@ -427,7 +430,7 @@ public void testReplicationAfterForceMerge() throws Exception { // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - refresh(INDEX_NAME); + waitForReplication(true); } } @@ -510,7 +513,7 @@ public void testNodeDropWithOngoingReplication() throws Exception { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } // Refresh, this should trigger round of segment replication - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); blockFileCopy.countDown(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellow(INDEX_NAME); @@ -604,7 +607,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); assertAcked( client().admin() @@ -619,7 +622,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true); waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); @@ -646,7 +649,7 @@ public void testDeleteOperations() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 2); @@ -662,7 +665,7 @@ public void testDeleteOperations() throws Exception { String id = ids.toArray()[0].toString(); client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + waitForReplication(true, INDEX_NAME); waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); } } @@ -681,7 +684,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { for (int i = 0; i < initialDocCount; i++) { client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, primary, replica); final int deletedDocCount = randomIntBetween(1, initialDocCount); @@ -716,7 +719,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount; assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); @@ -725,7 +728,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { // index another doc. client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1); } @@ -750,7 +753,7 @@ public void testUpdateOperations() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 5); @@ -767,9 +770,7 @@ public void testUpdateOperations() throws Exception { .get(); assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); assertEquals(2, updateResponse.getVersion()); - - refresh(INDEX_NAME); - + waitForReplication(true, INDEX_NAME); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); } @@ -801,7 +802,7 @@ public void testDropPrimaryDuringReplication() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // don't wait for replication to complete, stop the primary immediately. internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellow(INDEX_NAME); @@ -816,6 +817,7 @@ public void testDropPrimaryDuringReplication() throws Exception { client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); + waitForReplication(false); waitForSearchableDocs(initialDocCount + 1, dataNodes); } } @@ -865,9 +867,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); // Refresh, this should trigger round of segment replication - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } ensureGreen(INDEX_NAME); + waitForReplication(false); waitForSearchableDocs(docCount, primaryNode, replicaNode); final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); @@ -894,7 +897,7 @@ public void testPressureServiceStats() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // get shard references. final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); @@ -993,7 +996,7 @@ public void testScrollCreatedOnReplica() throws Exception { .setSource(jsonBuilder().startObject().field("field", i).endObject()) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } assertBusy( () -> assertEquals( @@ -1025,7 +1028,7 @@ public void testScrollCreatedOnReplica() throws Exception { for (int i = 3; i < 5; i++) { client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); if (randomBoolean()) { client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); flush(INDEX_NAME); @@ -1103,7 +1106,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { .get(); } // catch up replica with primary - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); assertBusy( () -> assertEquals( getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), @@ -1161,7 +1164,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { ); // perform refresh to start round of segment replication - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // wait for segrep to start and copy temporary files waitForFileCopy.await(); @@ -1201,7 +1204,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() ); }); - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); waitForSearchableDocs(finalDocCount, primary, replica); } @@ -1218,7 +1221,7 @@ public void testPitCreatedOnReplica() throws Exception { .setSource("foo", randomInt()) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } // wait until replication finishes, then make the pit request. assertBusy( @@ -1268,7 +1271,7 @@ public void testPitCreatedOnReplica() throws Exception { .setSource("foo", randomInt()) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); if (randomBoolean()) { client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); flush(INDEX_NAME); @@ -1339,7 +1342,7 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { // index a doc. client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); CountDownLatch latch = new CountDownLatch(1); // block replication @@ -1354,7 +1357,7 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { ensureYellow(INDEX_NAME); // index another doc while blocked, this would not get replicated to replica. client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } ensureGreen(INDEX_NAME); waitForSearchableDocs(2, nodes); @@ -1407,7 +1410,7 @@ public void testIndexWhileRecoveringReplica() throws Exception { .setRouting(randomAlphaOfLength(2)) .setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs") .get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); ensureGreen(INDEX_NAME); waitForSearchableDocs(4, primaryNode, replicaNode); @@ -1443,7 +1446,7 @@ public void testRestartPrimary_NoReplicas() throws Exception { if (randomBoolean()) { flush(INDEX_NAME); } else { - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); } internalCluster().restartNode(primary); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 4223add6a62c8..3cddd59d567d4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -126,6 +126,7 @@ public void testPrimaryRelocation() throws Exception { flushAndRefresh(INDEX_NAME); logger.info("--> verify count again {}", 2 * initialDocCount); waitForSearchableDocs(2 * initialDocCount, newPrimary, replica); + waitForReplication(false); } /** @@ -211,6 +212,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { }, 1, TimeUnit.MINUTES); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(2 * initialDocCount, oldPrimary, replica); + waitForReplication(false); } /** @@ -284,6 +286,7 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E }, 1, TimeUnit.MINUTES); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(totalDocCount, newPrimary, replica); + waitForReplication(false); } /** @@ -394,6 +397,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { }, 2, TimeUnit.MINUTES); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(totalDocCount, replica, newPrimary); + waitForReplication(false); } /** @@ -426,6 +430,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { ensureGreen(INDEX_NAME); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(20, primary, replica); + waitForReplication(false); } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationResizeRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationResizeRequestIT.java index 71b9d51ed0672..2842ef9fc647a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationResizeRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationResizeRequestIT.java @@ -123,7 +123,7 @@ public void testCreateSplitIndexWithSegmentReplicationBlocked() throws Exception for (int i = 0; i < docs; i++) { client().prepareIndex("test").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); } - refreshWithNoWaitForReplicas("test"); + refresh("test"); assertBusy(() -> { assertHitCount( client().prepareSearch("test") @@ -199,7 +199,7 @@ public void testCloneIndex() throws Exception { for (int i = 0; i < docs; i++) { client().prepareIndex("test").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); } - refreshWithNoWaitForReplicas("test"); + refresh("test"); assertBusy(() -> { assertHitCount( client().prepareSearch("test") diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 8cc887771a41b..235b06c5ca022 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -59,7 +59,7 @@ public void testSegmentReplicationStatsResponse() throws Exception { for (int i = 0; i < numDocs; i++) { index(INDEX_NAME, "doc", Integer.toString(i)); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); ensureSearchable(INDEX_NAME); assertBusy(() -> { @@ -95,7 +95,7 @@ public void testSegmentReplicationStatsResponseForActiveOnly() throws Exception for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // index 10 more docs waitForSearchableDocs(10L, asList(primaryNode, replicaNode)); @@ -124,7 +124,7 @@ public void testSegmentReplicationStatsResponseForActiveOnly() throws Exception connection.sendRequest(requestId, action, request, options); } ); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); try { waitForReplication.await(); } catch (InterruptedException e) { @@ -174,7 +174,7 @@ public void testNonDetailedResponse() throws Exception { for (int i = 0; i < numDocs; i++) { index(INDEX_NAME, "doc", Integer.toString(i)); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(numDocs, nodes); final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); @@ -222,7 +222,7 @@ public void testGetSpecificShard() throws Exception { for (int i = 0; i < numDocs; i++) { index(INDEX_NAME, "doc", Integer.toString(i)); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(numDocs, nodes); final IndexShard indexShard = getIndexShard(primaryNode, INDEX_NAME); @@ -281,7 +281,7 @@ public void testMultipleIndices() throws Exception { index(INDEX_NAME, "doc", Integer.toString(i)); index(index_2, "doc", Integer.toString(i)); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForSearchableDocs(INDEX_NAME, numDocs, nodes); waitForSearchableDocs(index_2, numDocs, nodes); @@ -353,7 +353,7 @@ public void testQueryAgainstDocRepIndex() { for (int i = 0; i < numDocs; i++) { index(INDEX_NAME, "doc", Integer.toString(i)); } - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); // search for all SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() @@ -385,7 +385,7 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { // index another doc while blocked, this would not get replicated to the replicas. Thread indexingThread = new Thread(() -> { client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); }); indexingThread.start(); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index dd55575298a10..5445c03d20cad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -44,8 +44,8 @@ public void testBasicReplication() throws Exception { for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); } - refresh(); ensureGreen(INDEX_NAME); + waitForReplication(true); } public void testDropRandomNodeDuringReplication() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 17746a3f24a4c..8c8a6a15b17f9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -184,7 +184,7 @@ public void testFieldDataStats() throws InterruptedException { ensureGreen(); client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet(); client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet(); - refresh(); + waitForReplication(true); indexRandomForConcurrentSearch("test"); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); @@ -308,7 +308,7 @@ public void testClearAllCaches() throws Exception { client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet(); client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet(); - refresh(); + waitForReplication(true); indexRandomForConcurrentSearch("test"); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); @@ -676,7 +676,7 @@ public void testSimpleStats() throws Exception { client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); - refresh(); + waitForReplication(true); NumShards test1 = getNumShards("test1"); long test1ExpectedWrites = 2 * test1.dataCopies; @@ -838,7 +838,7 @@ public void testMergeStats() { client().admin().indices().prepareFlush().execute().actionGet(); } client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet(); assertThat(stats.getTotal().getMerge(), notNullValue()); @@ -867,7 +867,7 @@ public void testSegmentsStats() { client().admin().indices().prepareFlush().get(); client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - refresh(); + waitForReplication(true); stats = client().admin().indices().prepareStats().setSegments(true).get(); assertThat(stats.getTotal().getSegments(), notNullValue()); @@ -885,7 +885,7 @@ public void testAllFlags() throws Exception { client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); - refresh(); + waitForReplication(true); IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); Flag[] values = CommonStatsFlags.Flag.values(); for (Flag flag : values) { @@ -1469,7 +1469,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() { .get() .status() ); - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0]; RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats(); assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java index 1c81ca65588f4..8372135fc55c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -57,7 +57,7 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception { blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode); final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); indexSingleDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage()); @@ -92,7 +92,7 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode); final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); indexSingleDoc(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); @@ -128,7 +128,7 @@ public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRel // index a doc. client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); - refreshWithNoWaitForReplicas(INDEX_NAME); + refresh(INDEX_NAME); logger.info("--> start another node"); final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 074a0f7828fab..e5733c83de58d 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -130,7 +130,6 @@ import org.opensearch.index.MockEngineFactoryPlugin; import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.codec.CodecService; -import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.CompletionFieldMapper; import org.opensearch.index.mapper.MockFieldFilterPlugin; @@ -1300,32 +1299,11 @@ protected final IndexResponse index(String index, String type, String id, String } /** - * Waits for relocations and refreshes all indices in the cluster. Then waits for replicas - * to catch up with primary when segment replication is enabled. + * Waits for relocations and refreshes all indices in the cluster. * * @see #waitForRelocation() */ protected final RefreshResponse refresh(String... indices) { - waitForRelocation(); - // TODO RANDOMIZE with flush? - RefreshResponse actionGet = client().admin() - .indices() - .prepareRefresh(indices) - .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) - .execute() - .actionGet(); - assertNoFailures(actionGet); - waitForReplicasToCatchUpWithPrimary(); - return actionGet; - } - - /** - * Waits for relocations and refreshes all indices in the cluster. This method doesn't wait for replicas - * to catch up with primary when segment replication is enabled - * - * @see #waitForRelocation() - */ - protected final RefreshResponse refreshWithNoWaitForReplicas(String... indices) { waitForRelocation(); // TODO RANDOMIZE with flush? RefreshResponse actionGet = client().admin() @@ -1575,7 +1553,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma indexRandomForMultipleSlices(indicesArray); } if (forceRefresh) { - waitForReplicasToCatchUpWithPrimary(); + waitForReplication(false); } } @@ -2381,52 +2359,61 @@ protected ClusterState getClusterState() { } /** - * Checks if replica shards caught up with primary shard when Segment Replication is enabled. + * Refreshes all indices in the cluster when forceRefresh set to true and + * checks if replica shards caught up with primary shard when Segment Replication is enabled. */ - protected void waitForReplicasToCatchUpWithPrimary() { + protected void waitForReplication(boolean forceRefresh, String... indices) { + if (forceRefresh) { + refresh(indices); + } if (isInternalCluster()) { + if (indices.length == 0) { + indices = getClusterState().routingTable().indicesRouting().keySet().toArray(String[]::new); + } try { - assertBusy(() -> { - final ClusterState clusterState = clusterAdmin().prepareState().get().getState(); - for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { - for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { - final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); - if (primaryRouting.state().toString().equals("STARTED")) { - final String indexName = primaryRouting.getIndexName(); - if (isSegmentReplicationEnabledForIndex(indexName)) { - final List replicaRouting = shardRoutingTable.replicaShards(); - final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); - final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); - for (ShardRouting replica : replicaRouting) { - if (replica.state().toString().equals("STARTED")) { - IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( - primarySegmentMetadata, - replicaShard.getSegmentMetadataMap() - ); - if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { - fail( - "Expected no missing or different segments between primary and replica but diff was missing: " - + recoveryDiff.missing - + " Different: " - + recoveryDiff.different - + " Primary Replication Checkpoint : " - + primaryShard.getLatestReplicationCheckpoint() - + " Replica Replication Checkpoint: " - + replicaShard.getLatestReplicationCheckpoint() + for (String index : indices) { + IndexRoutingTable indexRoutingTable = getClusterState().routingTable().index(index); + if (indexRoutingTable != null) { + assertBusy(() -> { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + if (primaryRouting.state().toString().equals("STARTED")) { + final String indexName = primaryRouting.getIndexName(); + if (isSegmentReplicationEnabledForIndex(indexName)) { + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + if (replica.state().toString().equals("STARTED")) { + IndexShard replicaShard = getIndexShard(replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + + // calls to readCommit will fail if a valid commit point and all its segments are not in the + // store. + replicaShard.store().readLastCommittedSegmentsInfo(); } - - // calls to readCommit will fail if a valid commit point and all its segments are not in the - // store. - replicaShard.store().readLastCommittedSegmentsInfo(); } } } } - } + }, 30, TimeUnit.SECONDS); } - }, 30, TimeUnit.SECONDS); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -2440,8 +2427,8 @@ protected boolean isSegmentReplicationEnabledForIndex(String index) { return clusterService().state().getMetadata().isSegmentReplicationEnabled(index); } - protected IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { - return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName); + protected IndexShard getIndexShard(ShardRouting routing, String indexName) { + return getIndexShard(getClusterState().nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName); } /** @@ -2455,13 +2442,4 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam return indexService.getShard(id.get()); } - /** - * Fetch Doc Count from an Index Shard. - */ - protected int getDocCountFromShard(IndexShard shard) { - try (final Engine.Searcher searcher = shard.acquireSearcher("test")) { - return searcher.getDirectoryReader().numDocs(); - } - } - }