Skip to content

Commit

Permalink
Get installed plugins from local node and add a timeout (#353)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 3, 2024
1 parent e7b778c commit efdbcb0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -79,9 +79,9 @@ public WorkflowProcessSorter(
this.workflowStepFactory = workflowStepFactory;
this.threadPool = threadPool;
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
this.clusterService = clusterService;
this.client = client;
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
}

/**
Expand Down Expand Up @@ -153,31 +153,45 @@ public void validate(List<ProcessNode> processNodes) throws Exception {
* @throws Exception on validation failure
*/
public void validatePluginsInstalled(List<ProcessNode> processNodes, WorkflowValidator validator) throws Exception {

// Retrieve node information to ascertain installed plugins
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
CompletableFuture<List<String>> installedPluginsFuture = new CompletableFuture<>();
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(response -> {
List<String> installedPlugins = new ArrayList<>();

// Retrieve installed plugin names from the local node
String localNodeId = clusterService.state().getNodes().getLocalNodeId();
NodeInfo info = response.getNodesMap().get(localNodeId);
PluginsAndModules plugins = info.getInfo(PluginsAndModules.class);
for (PluginInfo pluginInfo : plugins.getPluginInfos()) {
installedPlugins.add(pluginInfo.getName());
}

installedPluginsFuture.complete(installedPlugins);

}, exception -> {
logger.error("Failed to retrieve installed plugins");
installedPluginsFuture.completeExceptionally(exception);
final CompletableFuture<List<String>> installedPluginsFuture = new CompletableFuture<>();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear().nodes(true).local(true);
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(stateResponse -> {
final String localNodeId = stateResponse.getState().nodes().getLocalNodeId();

NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(infoResponse -> {
// Retrieve installed plugin names from the local node
try {
installedPluginsFuture.complete(
infoResponse.getNodesMap()
.get(localNodeId)
.getInfo(PluginsAndModules.class)
.getPluginInfos()
.stream()
.map(PluginInfo::getName)
.collect(Collectors.toList())
);
} catch (Exception e) {
logger.error("Failed to retrieve installed plugins from local node");
installedPluginsFuture.completeExceptionally(e);
}
}, infoException -> {
logger.error("Failed to retrieve installed plugins");
installedPluginsFuture.completeExceptionally(infoException);
}));
}, stateException -> {
logger.error("Failed to retrieve cluster state");
installedPluginsFuture.completeExceptionally(stateException);
}));

// Block execution until installed plugin list is returned
List<String> installedPlugins = installedPluginsFuture.get();
List<String> installedPlugins = installedPluginsFuture.orTimeout(
NODE_TIMEOUT_DEFAULT_VALUE.duration(),
NODE_TIMEOUT_DEFAULT_VALUE.timeUnit()
).get();

// Iterate through process nodes in graph
for (ProcessNode processNode : processNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
Expand Down Expand Up @@ -426,12 +428,19 @@ public void testSuccessfulInstalledPluginValidation() throws Exception {
when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);

// Mock and stub the clusterservice to get the local node
ClusterState clusterState = mock(ClusterState.class);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.getNodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getLocalNodeId()).thenReturn("123");
// Stub cluster state request
doAnswer(invocation -> {
ActionListener<ClusterStateResponse> listener = invocation.getArgument(1);

ClusterStateResponse response = mock(ClusterStateResponse.class);
ClusterState clusterState = mock(ClusterState.class);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(response.getState()).thenReturn(clusterState);
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getLocalNodeId()).thenReturn("123");
listener.onResponse(response);
return null;
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any());

// Stub cluster admin client's node info request
doAnswer(invocation -> {
Expand Down Expand Up @@ -510,12 +519,19 @@ public void testFailedInstalledPluginValidation() throws Exception {
when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);

// Mock and stub the clusterservice to get the local node
ClusterState clusterState = mock(ClusterState.class);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.getNodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getLocalNodeId()).thenReturn("123");
// Stub cluster state request
doAnswer(invocation -> {
ActionListener<ClusterStateResponse> listener = invocation.getArgument(1);

ClusterStateResponse response = mock(ClusterStateResponse.class);
ClusterState clusterState = mock(ClusterState.class);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(response.getState()).thenReturn(clusterState);
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getLocalNodeId()).thenReturn("123");
listener.onResponse(response);
return null;
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any());

// Stub cluster admin client's node info request
doAnswer(invocation -> {
Expand Down

0 comments on commit efdbcb0

Please sign in to comment.