Skip to content

Commit

Permalink
Optimise memory for rest cluster health calls with in-line shard aggr…
Browse files Browse the repository at this point in the history
…egations for levels cluster and indices. (opensearch-project#15492)

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
SwethaGuptha authored and Swetha Guptha committed Sep 6, 2024
1 parent 7712bea commit e5c0779
Show file tree
Hide file tree
Showing 15 changed files with 673 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterShardHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -49,6 +51,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -439,4 +442,164 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthWithClusterLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}

public void testHealthWithIndicesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("indices")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());

assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(1, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
assertEquals("test1", indexName);
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(1, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
assertTrue(indicesHealthValue.getShards().isEmpty());
}
}

public void testHealthWithShardLevelAppliedAtTransportLayer() {
int dataNodes = internalCluster().getDataNodeNames().size();
int greenClusterReplicaCount = dataNodes - 1;
int yellowClusterReplicaCount = dataNodes;

createIndex(
"test1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount)
.build()
);
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
createIndex(
"test2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount)
.build()
);
ensureGreen(TimeValue.timeValueSeconds(120));
client().admin()
.indices()
.prepareUpdateSettings()
.setIndices("test2")
.setSettings(Settings.builder().put("index.number_of_replicas", yellowClusterReplicaCount).build())
.execute()
.actionGet();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("shards")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus());

assertEquals(2 * dataNodes, healthResponse.getActiveShards());
assertEquals(2, healthResponse.getActivePrimaryShards());
assertEquals(1, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(2, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
boolean indexHasMoreReplicas = indexName.equals("test2");
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(dataNodes, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
assertFalse(shards.isEmpty());
assertEquals(1, shards.size());
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
assertEquals(dataNodes, clusterShardHealth.getActiveShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertTrue(clusterShardHealth.isPrimaryActive());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
}
}
}

public void testHealthWithAwarenessAttributesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("awareness_attributes")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertNotNull(healthResponse.getClusterAwarenessHealth());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -109,7 +110,7 @@ public void testNewRestoredIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMix
setDirection(REMOTE_STORE.direction);
String restoredIndexName2 = TEST_INDEX + "-restored2";
restoreSnapshot(snapshotRepoName, snapshotName, restoredIndexName2);
ensureGreen(restoredIndexName2);
ensureGreen(TimeValue.timeValueSeconds(90), restoredIndexName2);

logger.info("Verify that restored index is non remote-backed");
assertRemoteStoreBackedIndex(restoredIndexName2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
*/
private Level level = Level.CLUSTER;

/**
* This flag will be used by the TransportClusterHealthAction to decide if indices/shards info is required in the ClusterHealthResponse or not.
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level requested.
* When the flag is enabled - indices/shards info will be set according to health level requested.
* For Level.CLUSTER (or) Level.AWARENESS_ATTRIBUTES - information on indices/shards will NOT be returned to the transport client
* For Level.INDICES - information on indices will be returned to the transport client.
* For Level.SHARDS - information on indices and shards will be returned to the transport client
* By default, the flag is disabled.
*/
private boolean applyLevelAtTransportLayer = false;

public ClusterHealthRequest() {}

public ClusterHealthRequest(String... indices) {
Expand Down Expand Up @@ -109,6 +120,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeWeighedIn = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
applyLevelAtTransportLayer = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -146,6 +160,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeWeighedIn);
}
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(applyLevelAtTransportLayer);
}
}

@Override
Expand Down Expand Up @@ -344,6 +361,14 @@ public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

public boolean isApplyLevelAtTransportLayer() {
return applyLevelAtTransportLayer;
}

public void setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
this.applyLevelAtTransportLayer = applyLevelAtTransportLayer;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}

public ClusterHealthRequestBuilder setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
request.setApplyLevelAtTransportLayer(applyLevelAtTransportLayer);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = clusterHealthRequest.isApplyLevelAtTransportLayer()
? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level())
: new ClusterStateHealth(clusterState, concreteIndices);
this.clusterHealthStatus = clusterStateHealth.getStatus();
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
Expand All @@ -261,6 +281,29 @@ public ClusterHealthResponse(
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

public ClusterHealthResponse(
String clusterName,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterHealthRequest,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

/**
* For XContent Parser and serialization tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth(
TimeValue pendingTaskTimeInQueue
) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
logger.trace(
"Calculating health based on state version [{}] for health level [{}] and applyLevelAtTransportLayer set to [{}]",
clusterState.version(),
request.level(),
request.isApplyLevelAtTransportLayer()
);
}

String[] concreteIndices;
Expand All @@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth(
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
request,
clusterState,
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand All @@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
response.setStatus(ClusterHealthStatus.RED);
Expand All @@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq

ClusterHealthStatus clusterStatus = null;
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
}

return new ClusterStatsNodeResponse(
Expand Down
Loading

0 comments on commit e5c0779

Please sign in to comment.