From 9601b534abcca636cdc787ab3ad0353333a5b8d5 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 27 Nov 2023 16:35:40 -0800 Subject: [PATCH] [Backport feature/agent_framework] Adds transport request retry capability for GetMLTaskStep (#205) Adds transport request retry capability for GetMLTaskStep (#179) * Added FlowFrameworkMaxRequestRetrySetting and applied this to GetMlTaskStep * Addressing PR comments, creating abstract class RetryableWorkflowStep to initialize the setting update consumer for max retry request setting, passing settings and clusterservice to retryable workflow step instead * Addressing PR comments * Removing retry utils class * Fixing failure unit tests to ensure thread sleep isnt invoked * Addressing PR comments, changing setting name, improving javadoc, removing max value from setting, cancelling future on thread interuppt --------- (cherry picked from commit 10923251c59841d5f35fa7a588780495434f0768) Signed-off-by: Joshua Palis Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../flowframework/FlowFrameworkPlugin.java | 18 ++++- .../common/FlowFrameworkSettings.java | 9 +++ .../transport/GetWorkflowResponse.java | 2 + .../AbstractRetryableWorkflowStep.java | 34 +++++++++ .../flowframework/workflow/GetMLTaskStep.java | 73 +++++++++++++------ .../workflow/WorkflowStepFactory.java | 8 +- .../FlowFrameworkPluginTests.java | 5 +- .../model/WorkflowValidatorTests.java | 27 ++++++- .../workflow/GetMLTaskStepTests.java | 24 +++++- .../workflow/WorkflowProcessSorterTests.java | 24 +++++- 10 files changed, 192 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 42ad04213..4b2e4a9cb 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -60,6 +60,7 @@ import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; @@ -69,6 +70,7 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin { private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private ClusterService clusterService; /** @@ -93,10 +95,15 @@ public Collection createComponents( Settings settings = environment.settings(); this.clusterService = clusterService; flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings); - MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService); - WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler); + WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory( + settings, + clusterService, + client, + mlClient, + flowFrameworkIndicesHandler + ); WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool); return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler); @@ -132,7 +139,12 @@ public List getRestHandlers( @Override public List> getSettings() { - List> settings = ImmutableList.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT); + List> settings = ImmutableList.of( + FLOW_FRAMEWORK_ENABLED, + MAX_WORKFLOWS, + WORKFLOW_REQUEST_TIMEOUT, + MAX_GET_TASK_REQUEST_RETRY + ); return settings; } diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java index 82bea1abf..1824197e8 100644 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java @@ -44,4 +44,13 @@ private FlowFrameworkSettings() {} Setting.Property.NodeScope, Setting.Property.Dynamic ); + + /** This setting sets the maximum number of get task request retries */ + public static final Setting MAX_GET_TASK_REQUEST_RETRY = Setting.intSetting( + "plugins.flow_framework.max_get_task_request_retry", + 5, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); } diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java index b2c9bb884..922a8a3f5 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java @@ -22,7 +22,9 @@ */ public class GetWorkflowResponse extends ActionResponse implements ToXContentObject { + /** The workflow state */ public WorkflowState workflowState; + /** Flag to indicate if the entire state should be returned */ public boolean allStatus; /** diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java new file mode 100644 index 000000000..799edabb9 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; + +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; + +/** + * Abstract retryable workflow step + */ +public abstract class AbstractRetryableWorkflowStep implements WorkflowStep { + + /** The maximum number of transport request retries */ + protected volatile Integer maxRetry; + + /** + * Instantiates a new Retryable workflow step + * @param settings Environment settings + * @param clusterService the cluster service + */ + public AbstractRetryableWorkflowStep(Settings settings, ClusterService clusterService) { + this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/GetMLTaskStep.java b/src/main/java/org/opensearch/flowframework/workflow/GetMLTaskStep.java index bf950f280..bb57adbae 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/GetMLTaskStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/GetMLTaskStep.java @@ -11,11 +11,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.ml.client.MachineLearningNodeClient; -import org.opensearch.ml.common.MLTask; +import org.opensearch.ml.common.MLTaskState; import java.util.List; import java.util.Map; @@ -29,7 +32,7 @@ /** * Step to retrieve an ML Task */ -public class GetMLTaskStep implements WorkflowStep { +public class GetMLTaskStep extends AbstractRetryableWorkflowStep { private static final Logger logger = LogManager.getLogger(GetMLTaskStep.class); private MachineLearningNodeClient mlClient; @@ -37,9 +40,12 @@ public class GetMLTaskStep implements WorkflowStep { /** * Instantiate this class + * @param settings the Opensearch settings + * @param clusterService the OpenSearch cluster service * @param mlClient client to instantiate MLClient */ - public GetMLTaskStep(MachineLearningNodeClient mlClient) { + public GetMLTaskStep(Settings settings, ClusterService clusterService, MachineLearningNodeClient mlClient) { + super(settings, clusterService); this.mlClient = mlClient; } @@ -48,23 +54,6 @@ public CompletableFuture execute(List data) { CompletableFuture getMLTaskFuture = new CompletableFuture<>(); - ActionListener actionListener = ActionListener.wrap(response -> { - - // TODO : Add retry capability if response status is not COMPLETED : - // https://github.com/opensearch-project/flow-framework/issues/158 - - logger.info("ML Task retrieval successful"); - getMLTaskFuture.complete( - new WorkflowData( - Map.ofEntries(Map.entry(MODEL_ID, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())), - data.get(0).getWorkflowId() - ) - ); - }, exception -> { - logger.error("Failed to retrieve ML Task"); - getMLTaskFuture.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - }); - String taskId = null; for (WorkflowData workflowData : data) { @@ -84,7 +73,7 @@ public CompletableFuture execute(List data) { logger.error("Failed to retrieve ML Task"); getMLTaskFuture.completeExceptionally(new FlowFrameworkException("Required fields are not provided", RestStatus.BAD_REQUEST)); } else { - mlClient.getTask(taskId, actionListener); + retryableGetMlTask(data.get(0).getWorkflowId(), getMLTaskFuture, taskId, 0); } return getMLTaskFuture; @@ -95,4 +84,46 @@ public String getName() { return NAME; } + /** + * Retryable GetMLTask + * @param workflowId the workflow id + * @param getMLTaskFuture the workflow step future + * @param taskId the ml task id + * @param retries the current number of request retries + */ + protected void retryableGetMlTask(String workflowId, CompletableFuture getMLTaskFuture, String taskId, int retries) { + mlClient.getTask(taskId, ActionListener.wrap(response -> { + if (response.getState() != MLTaskState.COMPLETED) { + throw new IllegalStateException("MLTask is not yet completed"); + } else { + logger.info("ML Task retrieval successful"); + getMLTaskFuture.complete( + new WorkflowData( + Map.ofEntries( + Map.entry(MODEL_ID, response.getModelId()), + Map.entry(REGISTER_MODEL_STATUS, response.getState().name()) + ), + workflowId + ) + ); + } + }, exception -> { + if (retries < maxRetry) { + // Sleep thread prior to retrying request + try { + Thread.sleep(5000); + } catch (Exception e) { + FutureUtils.cancel(getMLTaskFuture); + } + final int retryAdd = retries + 1; + retryableGetMlTask(workflowId, getMLTaskFuture, taskId, retryAdd); + } else { + logger.error("Failed to retrieve ML Task, maximum retries exceeded"); + getMLTaskFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + } + })); + } + } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index fc65286e7..2e450d5b0 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -10,6 +10,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -29,22 +30,25 @@ public class WorkflowStepFactory { /** * Instantiate this class. * + * @param settings The OpenSearch settings * @param clusterService The OpenSearch cluster service * @param client The OpenSearch client steps can use * @param mlClient Machine Learning client to perform ml operations * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices */ public WorkflowStepFactory( + Settings settings, ClusterService clusterService, Client client, MachineLearningNodeClient mlClient, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler ) { this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; - populateMap(clusterService, client, mlClient, flowFrameworkIndicesHandler); + populateMap(settings, clusterService, client, mlClient, flowFrameworkIndicesHandler); } private void populateMap( + Settings settings, ClusterService clusterService, Client client, MachineLearningNodeClient mlClient, @@ -58,7 +62,7 @@ private void populateMap( stepMap.put(DeployModelStep.NAME, new DeployModelStep(mlClient)); stepMap.put(CreateConnectorStep.NAME, new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler)); stepMap.put(ModelGroupStep.NAME, new ModelGroupStep(mlClient)); - stepMap.put(GetMLTaskStep.NAME, new GetMLTaskStep(mlClient)); + stepMap.put(GetMLTaskStep.NAME, new GetMLTaskStep(settings, clusterService, mlClient)); } /** diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index c8fcdf69e..c89baba13 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -27,6 +27,7 @@ import java.util.stream.Stream; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; @@ -61,7 +62,7 @@ public void setUp() throws Exception { final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); @@ -83,7 +84,7 @@ public void testPlugin() throws IOException { assertEquals(4, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); assertEquals(4, ffp.getActions().size()); assertEquals(1, ffp.getExecutorBuilders(settings).size()); - assertEquals(3, ffp.getSettings().size()); + assertEquals(4, ffp.getSettings().size()); } } } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 28e7e0585..a6cbcf9bd 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -12,6 +12,9 @@ import org.opensearch.client.Client; import org.opensearch.client.ClusterAdminClient; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.workflow.WorkflowStepFactory; @@ -21,7 +24,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,7 +80,20 @@ public void testWorkflowStepFactoryHasValidators() throws IOException { MachineLearningNodeClient mlClient = mock(MachineLearningNodeClient.class); FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); - WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler); + final Set> settingsSet = Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + ).collect(Collectors.toSet()); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingsSet); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory( + Settings.EMPTY, + clusterService, + client, + mlClient, + flowFrameworkIndicesHandler + ); // Read in workflow-steps.json WorkflowValidator workflowValidator = WorkflowValidator.parse("mappings/workflow-steps.json"); diff --git a/src/test/java/org/opensearch/flowframework/workflow/GetMLTaskStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/GetMLTaskStepTests.java index f5f5f7e7d..efb59d42a 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/GetMLTaskStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/GetMLTaskStepTests.java @@ -10,6 +10,10 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -19,8 +23,11 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -28,10 +35,14 @@ import static org.opensearch.flowframework.common.CommonValue.MODEL_ID; import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; import static org.opensearch.flowframework.common.CommonValue.TASK_ID; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ThreadLeakScope(ThreadLeakScope.Scope.NONE) public class GetMLTaskStepTests extends OpenSearchTestCase { @@ -47,7 +58,18 @@ public void setUp() throws Exception { super.setUp(); MockitoAnnotations.openMocks(this); - this.getMLTaskStep = new GetMLTaskStep(mlNodeClient); + ClusterService clusterService = mock(ClusterService.class); + final Set> settingsSet = Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + Stream.of(MAX_GET_TASK_REQUEST_RETRY) + ).collect(Collectors.toSet()); + + // Set max request retry setting to 0 to avoid sleeping the thread during unit test failure cases + Settings testMaxRetrySetting = Settings.builder().put(MAX_GET_TASK_REQUEST_RETRY.getKey(), 0).build(); + ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + this.getMLTaskStep = spy(new GetMLTaskStep(testMaxRetrySetting, clusterService, mlNodeClient)); this.workflowData = new WorkflowData(Map.ofEntries(Map.entry(TASK_ID, "test")), "test-id"); } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 628913cd7..e9e792add 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -11,6 +11,9 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; @@ -30,9 +33,15 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.opensearch.flowframework.model.TemplateTestJsonUtil.edge; import static org.opensearch.flowframework.model.TemplateTestJsonUtil.node; import static org.opensearch.flowframework.model.TemplateTestJsonUtil.nodeWithType; @@ -70,10 +79,23 @@ public static void setup() { MachineLearningNodeClient mlClient = mock(MachineLearningNodeClient.class); FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + final Set> settingsSet = Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + ).collect(Collectors.toSet()); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingsSet); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(client.admin()).thenReturn(adminClient); testThreadPool = new TestThreadPool(WorkflowProcessSorterTests.class.getName()); - WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler); + WorkflowStepFactory factory = new WorkflowStepFactory( + Settings.EMPTY, + clusterService, + client, + mlClient, + flowFrameworkIndicesHandler + ); workflowProcessSorter = new WorkflowProcessSorter(factory, testThreadPool); }