Skip to content

Commit

Permalink
add new waitForReplication() and refactor.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Jan 28, 2024
1 parent c8a4caf commit 230b90e
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void indexData() throws Exception {
index("foo", "bar", "1", XContentFactory.jsonBuilder().startObject().field("foo", "foo").endObject());
index("fuu", "buu", "1", XContentFactory.jsonBuilder().startObject().field("fuu", "fuu").endObject());
index("baz", "baz", "1", XContentFactory.jsonBuilder().startObject().field("baz", "baz").endObject());
refresh();
waitForReplication(true);
}

public void testRoutingTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testWritesRejected() throws Exception {

indexDoc();
totalDocs.incrementAndGet();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -109,7 +109,7 @@ public void testWritesRejected() throws Exception {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
indexDoc();
totalDocs.incrementAndGet();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
});

// Verify the rejected doc count.
Expand All @@ -123,13 +123,13 @@ public void testWritesRejected() throws Exception {

assertEquals(perGroupStats.getRejectedRequestCount(), 2L);
}
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);

// wait for the replicas to catch up after block is released.
assertReplicaCheckpointUpdated(primaryShard);
// index another doc showing there is no pressure enforced.
indexDoc();
refreshWithNoWaitForReplicas(INDEX_NAME);
waitForReplication(true, INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
}

Expand Down Expand Up @@ -158,7 +158,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -177,13 +177,13 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(totalDocs.get(), replicaNodes);
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
indexDoc();
refreshWithNoWaitForReplicas(INDEX_NAME);
waitForReplication(true, INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
}

Expand Down Expand Up @@ -211,11 +211,11 @@ public void testBelowReplicaLimit() throws Exception {
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
}
// index another doc showing there is no pressure enforced.
indexDoc();
refreshWithNoWaitForReplicas(INDEX_NAME);
waitForReplication(true, INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
}

Expand Down Expand Up @@ -245,7 +245,7 @@ public void testFailStaleReplica() throws Exception {
latch.await();
// index again while we are stale.
indexDoc();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
totalDocs.incrementAndGet();

// Verify that replica shard is closed.
Expand Down Expand Up @@ -280,7 +280,7 @@ public void testWithDocumentReplicationEnabledIndex() throws Exception {
totalDocs.getAndSet(indexUntilCheckpointCount());
// index again after stale limit.
indexDoc();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
totalDocs.incrementAndGet();
// verify total doc count is same and docs are not rejected.
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get());
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testBulkWritesRejected() throws Exception {
Thread indexingThread = new Thread(() -> {
for (int i = 0; i < MAX_CHECKPOINTS_BEHIND + 1; i++) {
executeBulkRequest(nodes, docsPerBatch);
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
}
});
indexingThread.start();
Expand All @@ -318,12 +318,13 @@ public void testBulkWritesRejected() throws Exception {
// try and index again while we are stale.
assertBusy(() -> { assertFailedRequests(executeBulkRequest(nodes, randomIntBetween(1, 200))); });
}
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
waitForReplication(false);
waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {}));
}

Expand All @@ -335,7 +336,7 @@ private BulkResponse executeBulkRequest(List<String> nodes, int docsPerBatch) {
bulkRequest.add(request);
}
final BulkResponse bulkItemResponses = client(randomFrom(nodes)).bulk(bulkRequest).actionGet();
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
return bulkItemResponses;
}

Expand All @@ -351,7 +352,7 @@ private int indexUntilCheckpointCount() {
indexDoc();
}
total += numDocs;
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
}
return total;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testQueryRewrite() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testQueryRewriteMissingValues() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -326,7 +326,7 @@ public void testQueryRewriteDates() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -399,7 +399,7 @@ public void testQueryRewriteDatesWithNow() throws Exception {
.setFlush(true)
.get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);
ensureSearchable("index-1", "index-2", "index-3");

assertCacheState(client, "index-1", 0, 0);
Expand Down Expand Up @@ -470,7 +470,7 @@ public void testCanCache() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -564,7 +564,7 @@ public void testCacheWithFilteredAlias() throws InterruptedException {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
waitForReplication(true);

indexRandomForConcurrentSearch("index");

Expand Down Expand Up @@ -671,7 +671,7 @@ public void testCacheWithInvalidation() throws Exception {
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
waitForReplication(true);
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testSendCorruptBytesToReplica() throws Exception {
}
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
assertNotEquals(originalRecoveryTime, 0);
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception {
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

Expand All @@ -138,7 +138,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception {
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refreshWithNoWaitForReplicas(INDEX_NAME);
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
Expand Down
Loading

0 comments on commit 230b90e

Please sign in to comment.