diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 48a3f51bc..a51082e0e 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -72,9 +72,9 @@ import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; /** @@ -168,7 +168,7 @@ public List getRestHandlers( @Override public List> getSettings() { - return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY); + return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION); } @Override diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java index d5bec1871..780aa165b 100644 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java @@ -17,9 +17,9 @@ public class FlowFrameworkSettings { private volatile Boolean isFlowFrameworkEnabled; - /** The maximum number of transport request retries */ - private volatile Integer maxRetry; - /** Max workflow steps that can be created*/ + /** The duration between request retries */ + private volatile TimeValue retryDuration; + /** Max workflow steps that can be created */ private volatile Integer maxWorkflowSteps; /** The upper limit of max workflows that can be created */ @@ -27,7 +27,7 @@ public class FlowFrameworkSettings { /** The upper limit of max workflow steps that can be in a single workflow */ public static final int MAX_WORKFLOW_STEPS_LIMIT = 500; - /** This setting sets max workflows that can be created */ + /** This setting sets max workflows that can be created */ public static final Setting MAX_WORKFLOWS = Setting.intSetting( "plugins.flow_framework.max_workflows", 1000, @@ -37,7 +37,7 @@ public class FlowFrameworkSettings { Setting.Property.Dynamic ); - /** This setting sets max workflows that can be created */ + /** This setting sets max workflows that can be created */ public static final Setting MAX_WORKFLOW_STEPS = Setting.intSetting( "plugins.flow_framework.max_workflow_steps", 50, @@ -47,7 +47,7 @@ public class FlowFrameworkSettings { Setting.Property.Dynamic ); - /** This setting sets the timeout for the request */ + /** This setting sets the timeout for the request */ public static final Setting WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.flow_framework.request_timeout", TimeValue.timeValueSeconds(10), @@ -63,11 +63,10 @@ public class FlowFrameworkSettings { Setting.Property.Dynamic ); - /** This setting sets the maximum number of get task request retries */ - public static final Setting MAX_GET_TASK_REQUEST_RETRY = Setting.intSetting( - "plugins.flow_framework.max_get_task_request_retry", - 5, - 0, + /** This setting sets the time between task request retries */ + public static final Setting TASK_REQUEST_RETRY_DURATION = Setting.positiveTimeSetting( + "plugins.flow_framework.task_request_retry_duration", + TimeValue.timeValueSeconds(5), Setting.Property.NodeScope, Setting.Property.Dynamic ); @@ -82,15 +81,15 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) { // Currently this is just an on/off switch for the entire plugin's API. // If desired more fine-tuned feature settings can be added below. this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings); - this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings); + this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings); this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it); - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it); } /** - * Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble. + * Whether the flow framework feature is enabled. If disabled, no REST APIs will be available. * @return whether Flow Framework is enabled. */ public boolean isFlowFrameworkEnabled() { @@ -98,11 +97,11 @@ public boolean isFlowFrameworkEnabled() { } /** - * Getter for max retry count - * @return count of max retry + * Getter for retry duration + * @return retry duration */ - public Integer getMaxRetry() { - return maxRetry; + public TimeValue getRetryDuration() { + return retryDuration; } /** diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index fa2512c70..c277ab27d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; @@ -22,7 +23,6 @@ import org.opensearch.threadpool.ThreadPool; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; @@ -32,8 +32,7 @@ */ public abstract class AbstractRetryableWorkflowStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(AbstractRetryableWorkflowStep.class); - /** The maximum number of transport request retries */ - protected volatile Integer maxRetry; + private TimeValue retryDuration; private final MachineLearningNodeClient mlClient; private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; private ThreadPool threadPool; @@ -52,7 +51,7 @@ protected AbstractRetryableWorkflowStep( FlowFrameworkSettings flowFrameworkSettings ) { this.threadPool = threadPool; - this.maxRetry = flowFrameworkSettings.getMaxRetry(); + this.retryDuration = flowFrameworkSettings.getRetryDuration(); this.mlClient = mlClient; this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } @@ -74,9 +73,8 @@ protected void retryableGetMlTask( String workflowStep, ActionListener mlTaskListener ) { - AtomicInteger retries = new AtomicInteger(); CompletableFuture.runAsync(() -> { - while (retries.getAndIncrement() < this.maxRetry && !future.isDone()) { + while (!future.isDone()) { mlClient.getTask(taskId, ActionListener.wrap(response -> { switch (response.getState()) { case COMPLETED: @@ -123,19 +121,13 @@ protected void retryableGetMlTask( logger.error(errorMessage); mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); })); - // Wait long enough for future to possibly complete try { - Thread.sleep(5000); + Thread.sleep(this.retryDuration.getMillis()); } catch (InterruptedException e) { FutureUtils.cancel(future); Thread.currentThread().interrupt(); } } - if (!future.isDone()) { - String errorMessage = workflowStep + " did not complete after " + maxRetry + " retries"; - logger.error(errorMessage); - mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.REQUEST_TIMEOUT)); - } }, threadPool.executor(WORKFLOW_THREAD_POOL)); } diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index b08c27cfb..7a03077d9 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -27,9 +27,9 @@ import java.util.stream.Stream; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -63,7 +63,7 @@ public void setUp() throws Exception { final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); diff --git a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java index 04754f274..430d11508 100644 --- a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java +++ b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java @@ -12,6 +12,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -39,7 +40,7 @@ public void setUp() throws Exception { ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of( FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED, - FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY, + FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION, FlowFrameworkSettings.MAX_WORKFLOW_STEPS ) ).collect(Collectors.toSet()); @@ -56,7 +57,7 @@ public void tearDown() throws Exception { public void testSettings() throws IOException { assertFalse(flowFrameworkSettings.isFlowFrameworkEnabled()); - assertEquals(Optional.of(5), Optional.ofNullable(flowFrameworkSettings.getMaxRetry())); + assertEquals(Optional.of(TimeValue.timeValueSeconds(5)), Optional.ofNullable(flowFrameworkSettings.getRetryDuration())); assertEquals(Optional.of(50), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflowSteps())); } } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index de5ae440b..1b085ae1e 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -31,8 +31,8 @@ import java.util.stream.Stream; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -89,7 +89,7 @@ public void testWorkflowStepFactoryHasValidators() throws IOException { final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 61a39809d..718e7a31c 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -48,9 +48,9 @@ import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.CREATE_CONNECTOR; @@ -95,7 +95,7 @@ public void setUp() throws Exception { .build(); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index 90f2afc8d..64d982adc 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -52,7 +52,7 @@ import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -85,17 +85,16 @@ public void setUp() throws Exception { ClusterService clusterService = mock(ClusterService.class); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(MAX_GET_TASK_REQUEST_RETRY) + Stream.of(TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); // Set max request retry setting to 1 to limit sleeping the thread to one retry iteration - Settings testMaxRetrySetting = Settings.builder().put(MAX_GET_TASK_REQUEST_RETRY.getKey(), 1).build(); + Settings testMaxRetrySetting = Settings.builder().put(TASK_REQUEST_RETRY_DURATION.getKey(), 1).build(); ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); flowFrameworkSettings = mock(FlowFrameworkSettings.class); when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); - when(flowFrameworkSettings.getMaxRetry()).thenReturn(5); testThreadPool = new TestThreadPool( DeployModelStepTests.class.getName(), diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java index 1a604df46..b09e10e59 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java @@ -49,7 +49,7 @@ import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.mockito.ArgumentMatchers.any; @@ -80,17 +80,16 @@ public void setUp() throws Exception { ClusterService clusterService = mock(ClusterService.class); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(MAX_GET_TASK_REQUEST_RETRY) + Stream.of(TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); // Set max request retry setting to 1 to limit sleeping the thread to one retry iteration - Settings testMaxRetrySetting = Settings.builder().put(MAX_GET_TASK_REQUEST_RETRY.getKey(), 1).build(); + Settings testMaxRetrySetting = Settings.builder().put(TASK_REQUEST_RETRY_DURATION.getKey(), 1).build(); ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); flowFrameworkSettings = mock(FlowFrameworkSettings.class); when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); - when(flowFrameworkSettings.getMaxRetry()).thenReturn(5); testThreadPool = new TestThreadPool( RegisterLocalModelStepTests.class.getName(), diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index ccf47ac70..2d32bdb04 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -46,9 +46,9 @@ import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; -import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION; import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; @@ -98,7 +98,7 @@ public static void setup() throws IOException { Settings settings = Settings.builder().put("plugins.flow_framework.max_workflow_steps", 5).build(); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION) ).collect(Collectors.toSet()); ClusterSettings clusterSettings = new ClusterSettings(settings, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings);