Skip to content

Commit

Permalink
revert changes made to Segrep tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Jan 31, 2024
1 parent 7627fb9 commit dcdc7bf
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,67 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

public void testCancellationDuringGetCheckpointInfo() throws Exception {
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO);
}

public void testCancellationDuringGetSegments() throws Exception {
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES);
}

private void cancelDuringReplicaAction(String actionToblock) throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final SegmentReplicationTargetService targetService = internalCluster().getInstance(
SegmentReplicationTargetService.class,
replicaNode
);
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
CountDownLatch startCancellationLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);

MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
);
primaryTransportService.addRequestHandlingBehavior(actionToblock, (handler, request, channel, task) -> {
logger.info("action {}", actionToblock);
try {
startCancellationLatch.countDown();
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

// index a doc and trigger replication
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// remove the replica and ensure it is cleaned up.
startCancellationLatch.await();
SegmentReplicationTarget target = targetService.get(replicaShard.shardId());
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
assertEquals("Replication not closed: " + target.getId(), 0, target.refCount());
assertEquals("Store has a positive refCount", 0, replicaShard.store().refCount());
// stop the replica, this will do additional checks on shutDown to ensure the replica and its store are closed properly
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
latch.countDown();
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
* This test verifies primary recovery behavior with continuous ingestion
*
*/
@TestLogging(reason = "Enable trace logs from replication and recovery package", value = "org.opensearch.indices.recovery:TRACE,org.opensearch.indices.replication:TRACE")
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ public void testMultipleIndices() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String index_2 = "tst-index-2";
List<String> nodes = new ArrayList<>();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
nodes.add(primaryNode);
createIndex(INDEX_NAME, index_2);

ensureYellowAndNoInitializingShards(INDEX_NAME, index_2);
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
ensureGreen(INDEX_NAME, index_2);

final long numDocs = scaledRandomIntBetween(50, 100);
Expand All @@ -284,44 +284,47 @@ public void testMultipleIndices() throws Exception {
refresh(INDEX_NAME, index_2);
waitForSearchableDocs(INDEX_NAME, numDocs, nodes);
waitForSearchableDocs(index_2, numDocs, nodes);
ensureSearchable(INDEX_NAME, index_2);

final IndexShard index_1_primary = getIndexShard(primaryNode, INDEX_NAME);
final IndexShard index_2_primary = getIndexShard(primaryNode, index_2);

assertTrue(index_1_primary.routingEntry().primary());
assertTrue(index_2_primary.routingEntry().primary());

// test both indices are returned in the response.
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats()
.execute()
.actionGet();
assertBusy(() -> {
// test both indices are returned in the response.
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.execute()
.actionGet();

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse.getReplicationStats();
assertEquals(2, replicationStats.size());
List<SegmentReplicationPerGroupStats> replicationPerGroupStats = replicationStats.get(INDEX_NAME);
assertEquals(1, replicationPerGroupStats.size());
SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_1_primary.shardId());
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse.getReplicationStats();
assertEquals(2, replicationStats.size());
List<SegmentReplicationPerGroupStats> replicationPerGroupStats = replicationStats.get(INDEX_NAME);
assertEquals(1, replicationPerGroupStats.size());
SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_1_primary.shardId());
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}

replicationPerGroupStats = replicationStats.get(index_2);
assertEquals(1, replicationPerGroupStats.size());
perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_2_primary.shardId());
replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
replicationPerGroupStats = replicationStats.get(index_2);
assertEquals(1, replicationPerGroupStats.size());
perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_2_primary.shardId());
replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
}, 30, TimeUnit.SECONDS);

// test only single index queried.
segmentReplicationStatsResponse = client().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.setIndices(index_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ private static Settings.Builder setRandomIndexTranslogSettings(Random random, Se
}
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)); // just
// don't
// flush
// don't
// flush
}
if (random.nextBoolean()) {
builder.put(
Expand Down Expand Up @@ -2422,6 +2422,9 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
return indexService.getShard(id.get());
}

/**
* Fetch latest segment info snapshot version of an index.
*/
protected long getLatestSegmentInfoVersion(IndexShard shard) {
try (final GatedCloseable<SegmentInfos> snapshot = shard.getSegmentInfosSnapshot()) {
return snapshot.get().version;
Expand Down

0 comments on commit dcdc7bf

Please sign in to comment.