Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Added support for search replica to return segrep stats #16842

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
Expand Down Expand Up @@ -82,4 +88,47 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
createIndex(
INDEX_NAME,
Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();

// Verify the number of indices
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
// Verify total shards
assertEquals(2, segmentReplicationStatsResponse.getTotalShards());
// Verify the number of primary shards
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
// Verify the number of replica stats
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replicaStat : replicaStats) {
assertNotNull(replicaStat.getCurrentReplicationState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -38,7 +37,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Transport action for shard segment replication operation. This transport action does not actually
Expand Down Expand Up @@ -96,11 +97,11 @@ protected SegmentReplicationStatsResponse newResponse(
) {
String[] shards = request.shards();
final List<Integer> shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList());

// organize replica responses by allocationId.
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +110,7 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -126,15 +128,20 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the replica stats to the shard stat entry in each group.
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null));
}
}
}
return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = primaryStats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue()
.stream()
.map(groupStats -> updateGroupStats(groupStats, replicaStats))
.collect(Collectors.toList())
)
);

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, replicationStats, shardFailures);
}

@Override
Expand All @@ -144,9 +151,8 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws

@Override
protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());

if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
Expand All @@ -156,11 +162,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

@Override
Expand All @@ -181,4 +183,83 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationPerGroupStats updateGroupStats(
SegmentReplicationPerGroupStats groupStats,
Map<String, SegmentReplicationState> replicaStats
) {
// Update the SegmentReplicationState for each of the replicas
Set<SegmentReplicationShardStats> updatedReplicaStats = groupStats.getReplicaStats()
.stream()
.peek(replicaStat -> replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)))
.collect(Collectors.toSet());

// Compute search replica stats
Set<SegmentReplicationShardStats> searchReplicaStats = computeSearchReplicaStats(groupStats.getShardId(), replicaStats);

// Combine ReplicaStats and SearchReplicaStats
Set<SegmentReplicationShardStats> combinedStats = Stream.concat(updatedReplicaStats.stream(), searchReplicaStats.stream())
.collect(Collectors.toSet());

return new SegmentReplicationPerGroupStats(groupStats.getShardId(), combinedStats, groupStats.getRejectedRequestCount());
}

private Set<SegmentReplicationShardStats> computeSearchReplicaStats(
ShardId shardId,
Map<String, SegmentReplicationState> replicaStats
) {
return replicaStats.values()
.stream()
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().shardId().equals(shardId))
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().isSearchOnly())
.map(segmentReplicationState -> {
ShardRouting shardRouting = segmentReplicationState.getShardRouting();
SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats(shardRouting);
segmentReplicationStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationStats;
})
.collect(Collectors.toSet());
}

SegmentReplicationShardStats computeSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

return new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
0,
calculateBytesRemainingToReplicate(ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
);
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateBytesRemainingToReplicate(SegmentReplicationState ongoingSegmentReplicationState) {
if (ongoingSegmentReplicationState == null) {
return 0;
}
return ongoingSegmentReplicationState.getIndex()
.fileDetails()
.stream()
.mapToLong(index -> index.length() - index.recovered())
.sum();
}

private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(SegmentReplicationState completedSegmentReplicationState) {
return completedSegmentReplicationState != null ? completedSegmentReplicationState.getTimer().time() : 0;
}
}
Loading
Loading