Skip to content

Commit

Permalink
[Backport 2.x] Backport SegmentReplication test muting missing from 2…
Browse files Browse the repository at this point in the history
….x. (#5945)

* [Segment Replication] Add snapshot and restore tests for segment replication feature (#3993)

* [Segment Replication] Add snapshots tests with segment replication enabled

Signed-off-by: Suraj Singh <[email protected]>

* Fix spotless failures

Signed-off-by: Suraj Singh <[email protected]>

* Add changelog entry, address review comments, add failover test

Signed-off-by: Suraj Singh <[email protected]>

* Fix spotless failures

Signed-off-by: Suraj Singh <[email protected]>

* Address review comments 2

Signed-off-by: Suraj Singh <[email protected]>

Signed-off-by: Suraj Singh <[email protected]>

* Remove changelog update.

Signed-off-by: Marc Handalian <[email protected]>

* Mute flaky test testStartReplicaAfterPrimaryIndexesDocs. (#5714)

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>

* Fix flaky Segment Replication test testStartReplicaAfterPrimaryIndexesDocs. (#5722)

* Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs.

This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint.  In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed.  This change differentiates shard validation from checkpoint validation.

Signed-off-by: Marc Handalian <[email protected]>

Fix spotless.

Signed-off-by: Marc Handalian <[email protected]>

Fix testIsSegmentReplicationAllowed_WrongEngineType.

Signed-off-by: Marc Handalian <[email protected]>

Update warn logs in isSegmentReplicationAllowed.

Signed-off-by: Marc Handalian <[email protected]>

* PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>

* [Segment Replication] Mute flaky tests (#5739)

Signed-off-by: Suraj Singh <[email protected]>

Signed-off-by: Suraj Singh <[email protected]>

* [Segment Replication] Mute flaky tests (#5742)

Signed-off-by: Suraj Singh <[email protected]>

Signed-off-by: Suraj Singh <[email protected]>

* Fix spotless.

Signed-off-by: Marc Handalian <[email protected]>

* Muting flaky SegmentReplication ITs. (#5700)

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Suraj Singh <[email protected]>
Signed-off-by: Marc Handalian <[email protected]>
Co-authored-by: Suraj Singh <[email protected]>
  • Loading branch information
2 people authored and kotwanikunal committed Jan 25, 2023
1 parent 95d7895 commit 51f7ea0
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -55,15 +57,17 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;
protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -91,6 +95,26 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -132,6 +156,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertSegmentStats(REPLICA_COUNT);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -266,6 +291,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
Expand Down Expand Up @@ -497,6 +523,7 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down Expand Up @@ -598,6 +625,61 @@ public void testDeleteOperations() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode(featureFlagSettings());

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id)
.setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();
assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh());
assertEquals(2, updateResponse.getVersion());

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);

}
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down Expand Up @@ -689,7 +771,7 @@ public void testDropPrimaryDuringReplication() throws Exception {

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
* @throws Exception if assertion fails
*/
private void waitForReplicaUpdate() throws Exception {
// wait until the replica has the latest segment generation.
Expand All @@ -706,7 +788,7 @@ private void waitForReplicaUpdate() throws Exception {
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
Expand Down
Loading

0 comments on commit 51f7ea0

Please sign in to comment.