Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat… #14749

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends TransportRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean retainDiscoveryNodes = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void retainDiscoveryNodes(boolean value) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
retainDiscoveryNodes = value;
}

public boolean retainDiscoveryNodes() {
return retainDiscoveryNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -238,10 +239,20 @@ class AsyncAction {
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);

if (request.retainDiscoveryNodes() == false) {
// We transfer the ownership of discovery nodes to route the request to into the AsyncAction class.
// This reduces the payload of the request and improves the number of concrete nodes in the memory
this.concreteNodes = request.concreteNodes();
request.setConcreteNodes(null);
} else {
// initializing it separately as we keep the `concreteNodes` as final since we want it to be immutable.
this.concreteNodes = null;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
}
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +271,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.retainDiscoveryNodes(false);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);

nodesInfoRequest.retainDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.retainDiscoveryNodes(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.retainDiscoveryNodes(false);

Check warning on line 128 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L128

Added line #L128 was not covered by tests
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -137,6 +138,7 @@
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.retainDiscoveryNodes(false);

Check warning on line 141 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L141

Added line #L141 was not covered by tests
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.rest.action.admin.cluster.RestClusterStatsAction;
import org.opensearch.rest.action.admin.cluster.RestNodesInfoAction;
import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;

import java.util.Collections;

public class RestStatsActionTests extends OpenSearchTestCase {
private final TestThreadPool threadPool = new TestThreadPool(RestStatsActionTests.class.getName());
private final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);

@After
public void terminateThreadPool() {
terminate(threadPool);
}

public void testClusterStatsActionPrepareRequestNoError() {
RestClusterStatsAction action = new RestClusterStatsAction();
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}

public void testNodesStatsActionPrepareRequestNoError() {
RestNodesStatsAction action = new RestNodesStatsAction();
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}

public void testNodesInfoActionPrepareRequestNoError() {
RestNodesInfoAction action = new RestNodesInfoAction(new SettingsFilter(Collections.singleton("foo.filtered")));
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.retainDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.retainDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.retainDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
TransportNodesAction action = getTestTransportClusterStatsAction();
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator =
(TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
clusterStatsNodeRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
sentRequestList.add(mockClusterStatsNodeRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

combinedSentRequest.put(node, sentRequestList);
});

return combinedSentRequest;
}

private TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
return new TestTransportClusterStatsAction(
THREAD_POOL,
clusterService,
transportService,
nodeService,
indicesService,
new ActionFilters(Collections.emptySet())
);
}

private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
public TestTransportClusterStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
IndicesService indicesService,
ActionFilters actionFilters
) {
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
}
}

private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest {

public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
Expand Down Expand Up @@ -76,11 +78,12 @@

public class TransportNodesActionTests extends OpenSearchTestCase {

private static ThreadPool THREAD_POOL;

private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
protected static ThreadPool THREAD_POOL;
protected ClusterService clusterService;
protected CapturingTransport transport;
protected TransportService transportService;
protected NodeService nodeService;
protected IndicesService indicesService;

public void testRequestIsSentToEachNode() throws Exception {
TransportNodesAction action = getTestTransportNodesAction();
Expand Down
Loading
Loading