Skip to content

Commit

Permalink
Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat… (
Browse files Browse the repository at this point in the history
#14749)

* Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call

Signed-off-by: Pranshu Shukla <[email protected]>
(cherry picked from commit 0040f4b)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jul 22, 2024
1 parent 4c7d94c commit 8848b42
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends BaseNodeRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends BaseNodeRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends BaseNodeRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -238,10 +239,18 @@ class AsyncAction {
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = this.concreteNodes;
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +269,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);

nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);

Check warning on line 128 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L128

Added line #L128 was not covered by tests
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);

Check warning on line 141 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L141

Added line #L141 was not covered by tests
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.rest.action.admin.cluster.RestClusterStatsAction;
import org.opensearch.rest.action.admin.cluster.RestNodesInfoAction;
import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;

import java.util.Collections;

public class RestStatsActionTests extends OpenSearchTestCase {
private final TestThreadPool threadPool = new TestThreadPool(RestStatsActionTests.class.getName());
private final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);

@After
public void terminateThreadPool() {
terminate(threadPool);
}

public void testClusterStatsActionPrepareRequestNoError() {
RestClusterStatsAction action = new RestClusterStatsAction();
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}

public void testNodesStatsActionPrepareRequestNoError() {
RestNodesStatsAction action = new RestNodesStatsAction();
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}

public void testNodesInfoActionPrepareRequestNoError() {
RestNodesInfoAction action = new RestNodesInfoAction(new SettingsFilter(Collections.singleton("foo.filtered")));
try {
action.prepareRequest(new FakeRestRequest(), client);
} catch (Throwable t) {
fail(t.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
TransportNodesAction action = getTestTransportClusterStatsAction();
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator =
(TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
clusterStatsNodeRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
sentRequestList.add(mockClusterStatsNodeRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

combinedSentRequest.put(node, sentRequestList);
});

return combinedSentRequest;
}

private TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
return new TestTransportClusterStatsAction(
THREAD_POOL,
clusterService,
transportService,
nodeService,
indicesService,
new ActionFilters(Collections.emptySet())
);
}

private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
public TestTransportClusterStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
IndicesService indicesService,
ActionFilters actionFilters
) {
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
}
}

private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest {

public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
Expand Down Expand Up @@ -75,11 +77,12 @@

public class TransportNodesActionTests extends OpenSearchTestCase {

private static ThreadPool THREAD_POOL;

private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
protected static ThreadPool THREAD_POOL;
protected ClusterService clusterService;
protected CapturingTransport transport;
protected TransportService transportService;
protected NodeService nodeService;
protected IndicesService indicesService;

public void testRequestIsSentToEachNode() throws Exception {
TransportNodesAction action = getTestTransportNodesAction();
Expand Down
Loading

0 comments on commit 8848b42

Please sign in to comment.