Skip to content

Commit

Permalink
Optimize TransportNodesAction to not send DiscoveryNodes for NodeStat…
Browse files Browse the repository at this point in the history
…s, NodesInfo and ClusterStats call

Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 11, 2024
1 parent 93d507a commit 4f6b6ed
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends TransportRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* since in multiple code paths we do not use the discovery nodes coming from the request, we do not require it to sent around across all nodes
*/
private boolean populateDiscoveryNodesInTransportRequest = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -119,6 +124,12 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void populateDiscoveryNodesInTransportRequest(boolean value) {
populateDiscoveryNodesInTransportRequest = value;
}
public boolean populateDiscoveryNodesInTransportRequest() {
return populateDiscoveryNodesInTransportRequest;
}
@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -57,10 +58,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Function;

/**
* Base action class for transport nodes
Expand Down Expand Up @@ -209,6 +212,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
}

/**
* populate the concrete nodes from the request node ids
**/
protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) {
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new);
}

/**
* Get a backwards compatible transport action name
*/
Expand All @@ -226,6 +238,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -234,14 +247,27 @@ class AsyncAction {
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
if (request.populateDiscoveryNodesInTransportRequest()) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
this.concreteNodes = null;
} else {
this.concreteNodes = resolveConcreteNodes(request, clusterService.state());
assert request.concreteNodes() == null;
}
} else {
this.concreteNodes = null;
}
if (request.concreteNodes() == null) {
assert concreteNodes != null;
this.responses = new AtomicReferenceArray<>(concreteNodes.length);
} else {
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes;
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +286,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);

nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.*;

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

public void testOptimizedBehavior() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNull(sentRequest.getDiscoveryNodes());
});
});
}

public void testDefaultBehavior() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize() );
});
});
}

private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
TransportNodesAction action = getTestTransportClusterStatsAction();
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator= (TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
clusterStatsNodeRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
sentRequestList.add(mockClusterStatsNodeRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

combinedSentRequest.put(node, sentRequestList);
});

return combinedSentRequest;
}

public TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
return new TestTransportClusterStatsAction(
THREAD_POOL,
clusterService,
transportService,
nodeService,
indicesService,
new ActionFilters(Collections.emptySet())
);
}

private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
public TestTransportClusterStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
IndicesService indicesService,
ActionFilters actionFilters) {
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
}
}

private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest{

public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
Expand Down Expand Up @@ -76,11 +78,12 @@

public class TransportNodesActionTests extends OpenSearchTestCase {

private static ThreadPool THREAD_POOL;

private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
protected static ThreadPool THREAD_POOL;
protected ClusterService clusterService;
protected CapturingTransport transport;
protected TransportService transportService;
protected NodeService nodeService;
protected IndicesService indicesService;

public void testRequestIsSentToEachNode() throws Exception {
TransportNodesAction action = getTestTransportNodesAction();
Expand Down
Loading

0 comments on commit 4f6b6ed

Please sign in to comment.