Skip to content

Commit

Permalink
Optimize TransportNodesAction to selectively avoid sending list of Di…
Browse files Browse the repository at this point in the history
…scoveryNodes object to each target node.

Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jun 25, 2024
1 parent 93d507a commit 9e66ed2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.action.support.nodes;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -48,6 +50,7 @@
* @opensearch.internal
*/
public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>> extends ActionRequest {
private static final Logger logger = LogManager.getLogger(BaseNodesRequest.class);

/**
* the list of nodesIds that will be used to resolve this request and {@link #concreteNodes}
Expand Down Expand Up @@ -128,6 +131,7 @@ public ActionRequestValidationException validate() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
logger.info("concrete nodes is null? " + (concreteNodes == null));
out.writeOptionalArray(concreteNodes);
out.writeOptionalTimeValue(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -57,10 +58,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Function;

/**
* Base action class for transport nodes
Expand All @@ -73,6 +76,17 @@ public abstract class TransportNodesAction<
NodeRequest extends TransportRequest,
NodeResponse extends BaseNodeResponse> extends HandledTransportAction<NodesRequest, NodesResponse> {

public static final String OPTIMIZED_TRANSPORT_NODES_ACTIONS = "opensearch.experimental.optimization.transport_nodes_action.list";

public static final Setting<List<String>> OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING = Setting.listSetting(
OPTIMIZED_TRANSPORT_NODES_ACTIONS,
Collections.emptyList(),
Function.identity(),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
public volatile List<String> optimizedTransportActionNames;

protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportService transportService;
Expand Down Expand Up @@ -114,6 +128,10 @@ protected TransportNodesAction(
this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());

this.optimizedTransportActionNames = OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING, this::setOptimizedTransportNodesActionsList);
}

/**
Expand Down Expand Up @@ -148,6 +166,10 @@ protected TransportNodesAction(
);
}

protected void setOptimizedTransportNodesActionsList(List<String> newOptimizedTransportNodesActionsList) {
this.optimizedTransportActionNames = newOptimizedTransportNodesActionsList;
}

@Override
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
new AsyncAction(task, request, listener).start();
Expand Down Expand Up @@ -209,6 +231,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
}

/**
* populate the concrete nodes from the request node ids
**/
protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) {
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new);
}

/**
* Get a backwards compatible transport action name
*/
Expand All @@ -226,6 +257,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -234,14 +266,28 @@ class AsyncAction {
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
if (optimizedTransportActionNames.contains(request.getClass().getSimpleName())) {
this.concreteNodes = resolveConcreteNodes(request, clusterService.state());
assert request.concreteNodes() == null;
} else {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
this.concreteNodes = null;
}
} else {
this.concreteNodes = null;
}
if (request.concreteNodes() == null) {
assert concreteNodes != null;
this.responses = new AtomicReferenceArray<>(concreteNodes.length);
} else {
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
logger.info(request.getClass().getSimpleName());
final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes;
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +306,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.bootstrap.BootstrapSettings;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -749,7 +750,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
TransportNodesAction.OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING
)
)
);
Expand Down

0 comments on commit 9e66ed2

Please sign in to comment.