diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 4d54ce51c923c..cfe545ed21175 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -32,6 +32,8 @@ package org.opensearch.action.support.nodes; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.cluster.node.DiscoveryNode; @@ -48,6 +50,7 @@ * @opensearch.internal */ public abstract class BaseNodesRequest> extends ActionRequest { + private static final Logger logger = LogManager.getLogger(BaseNodesRequest.class); /** * the list of nodesIds that will be used to resolve this request and {@link #concreteNodes} @@ -128,6 +131,7 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArrayNullable(nodesIds); + logger.info("concrete nodes is null? " + (concreteNodes == null)); out.writeOptionalArray(concreteNodes); out.writeOptionalTimeValue(timeout); } diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 9a1a28dd70636..63c48f7a69402 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; @@ -57,10 +58,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Function; /** * Base action class for transport nodes @@ -73,6 +76,17 @@ public abstract class TransportNodesAction< NodeRequest extends TransportRequest, NodeResponse extends BaseNodeResponse> extends HandledTransportAction { + public static final String OPTIMIZED_TRANSPORT_NODES_ACTIONS = "opensearch.experimental.optimization.transport_nodes_action.list"; + + public static final Setting> OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING = Setting.listSetting( + OPTIMIZED_TRANSPORT_NODES_ACTIONS, + Collections.emptyList(), + Function.identity(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public volatile List optimizedTransportActionNames; + protected final ThreadPool threadPool; protected final ClusterService clusterService; protected final TransportService transportService; @@ -114,6 +128,10 @@ protected TransportNodesAction( this.transportNodeAction = actionName + "[n]"; this.finalExecutor = finalExecutor; transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler()); + + this.optimizedTransportActionNames = OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING, this::setOptimizedTransportNodesActionsList); } /** @@ -148,6 +166,10 @@ protected TransportNodesAction( ); } + protected void setOptimizedTransportNodesActionsList(List newOptimizedTransportNodesActionsList) { + this.optimizedTransportActionNames = newOptimizedTransportNodesActionsList; + } + @Override protected void doExecute(Task task, NodesRequest request, ActionListener listener) { new AsyncAction(task, request, listener).start(); @@ -209,6 +231,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) { request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new)); } + /** + * populate the concrete nodes from the request node ids + **/ + protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) { + assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; + String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds()); + return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new); + } + /** * Get a backwards compatible transport action name */ @@ -226,6 +257,7 @@ class AsyncAction { private final NodesRequest request; private final ActionListener listener; private final AtomicReferenceArray responses; + private final DiscoveryNode[] concreteNodes; private final AtomicInteger counter = new AtomicInteger(); private final Task task; @@ -234,14 +266,28 @@ class AsyncAction { this.request = request; this.listener = listener; if (request.concreteNodes() == null) { - resolveRequest(request, clusterService.state()); - assert request.concreteNodes() != null; + if (optimizedTransportActionNames.contains(request.getClass().getSimpleName())) { + this.concreteNodes = resolveConcreteNodes(request, clusterService.state()); + assert request.concreteNodes() == null; + } else { + resolveRequest(request, clusterService.state()); + assert request.concreteNodes() != null; + this.concreteNodes = null; + } + } else { + this.concreteNodes = null; + } + if (request.concreteNodes() == null) { + assert concreteNodes != null; + this.responses = new AtomicReferenceArray<>(concreteNodes.length); + } else { + this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } - this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } void start() { - final DiscoveryNode[] nodes = request.concreteNodes(); + logger.info(request.getClass().getSimpleName()); + final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes; if (nodes.length == 0) { // nothing to notify threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses))); @@ -260,7 +306,6 @@ void start() { if (task != null) { nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); } - transportService.sendRequest( node, getTransportNodeAction(node), diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 335615a6affb7..f78468647f185 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -40,6 +40,7 @@ import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; +import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.bootstrap.BootstrapSettings; import org.opensearch.client.Client; @@ -749,7 +750,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, - SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING + SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + TransportNodesAction.OPTIMIZED_TRANSPORT_NODES_ACTIONS_SETTING ) ) );