Skip to content

Commit

Permalink
Change max retries to retry duration
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 8, 2024
1 parent 324a56a commit 0641695
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
Expand Down Expand Up @@ -168,7 +168,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY);
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
public class FlowFrameworkSettings {

private volatile Boolean isFlowFrameworkEnabled;
/** The maximum number of transport request retries */
private volatile Integer maxRetry;
/** Max workflow steps that can be created*/
/** The duration between request retries */
private volatile TimeValue retryDuration;
/** Max workflow steps that can be created */
private volatile Integer maxWorkflowSteps;

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
/** The upper limit of max workflow steps that can be in a single workflow */
public static final int MAX_WORKFLOW_STEPS_LIMIT = 500;

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
1000,
Expand All @@ -37,7 +37,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOW_STEPS = Setting.intSetting(
"plugins.flow_framework.max_workflow_steps",
50,
Expand All @@ -47,7 +47,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
Expand All @@ -63,11 +63,10 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the maximum number of get task request retries */
public static final Setting<Integer> MAX_GET_TASK_REQUEST_RETRY = Setting.intSetting(
"plugins.flow_framework.max_get_task_request_retry",
5,
0,
/** This setting sets the time between task request retries */
public static final Setting<TimeValue> TASK_REQUEST_RETRY_DURATION = Setting.positiveTimeSetting(
"plugins.flow_framework.task_request_retry_duration",
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -82,27 +81,27 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) {
// Currently this is just an on/off switch for the entire plugin's API.
// If desired more fine-tuned feature settings can be added below.
this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings);
this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings);
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
}

/**
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble.
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be available.
* @return whether Flow Framework is enabled.
*/
public boolean isFlowFrameworkEnabled() {
return isFlowFrameworkEnabled;
}

/**
* Getter for max retry count
* @return count of max retry
* Getter for retry duration
* @return retry duration
*/
public Integer getMaxRetry() {
return maxRetry;
public TimeValue getRetryDuration() {
return retryDuration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
Expand All @@ -22,7 +23,6 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
Expand All @@ -32,8 +32,7 @@
*/
public abstract class AbstractRetryableWorkflowStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(AbstractRetryableWorkflowStep.class);
/** The maximum number of transport request retries */
protected volatile Integer maxRetry;
private TimeValue retryDuration;
private final MachineLearningNodeClient mlClient;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private ThreadPool threadPool;
Expand All @@ -52,7 +51,7 @@ protected AbstractRetryableWorkflowStep(
FlowFrameworkSettings flowFrameworkSettings
) {
this.threadPool = threadPool;
this.maxRetry = flowFrameworkSettings.getMaxRetry();
this.retryDuration = flowFrameworkSettings.getRetryDuration();
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -74,9 +73,8 @@ protected void retryableGetMlTask(
String workflowStep,
ActionListener<MLTask> mlTaskListener
) {
AtomicInteger retries = new AtomicInteger();
CompletableFuture.runAsync(() -> {
while (retries.getAndIncrement() < this.maxRetry && !future.isDone()) {
while (!future.isDone()) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
switch (response.getState()) {
case COMPLETED:
Expand Down Expand Up @@ -123,19 +121,13 @@ protected void retryableGetMlTask(
logger.error(errorMessage);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}));
// Wait long enough for future to possibly complete
try {
Thread.sleep(5000);
Thread.sleep(this.retryDuration.getMillis());
} catch (InterruptedException e) {
FutureUtils.cancel(future);
Thread.currentThread().interrupt();
}
}
if (!future.isDone()) {
String errorMessage = workflowStep + " did not complete after " + maxRetry + " retries";
logger.error(errorMessage);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.REQUEST_TIMEOUT));
}
}, threadPool.executor(WORKFLOW_THREAD_POOL));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -63,7 +63,7 @@ public void setUp() throws Exception {

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void setUp() throws Exception {
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(
FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED,
FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY,
FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION,
FlowFrameworkSettings.MAX_WORKFLOW_STEPS
)
).collect(Collectors.toSet());
Expand All @@ -56,7 +57,7 @@ public void tearDown() throws Exception {

public void testSettings() throws IOException {
assertFalse(flowFrameworkSettings.isFlowFrameworkEnabled());
assertEquals(Optional.of(5), Optional.ofNullable(flowFrameworkSettings.getMaxRetry()));
assertEquals(Optional.of(TimeValue.timeValueSeconds(5)), Optional.ofNullable(flowFrameworkSettings.getRetryDuration()));
assertEquals(Optional.of(50), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflowSteps()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void testWorkflowStepFactoryHasValidators() throws IOException {

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingsSet);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CREATE_CONNECTOR;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void setUp() throws Exception {
.build();
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -85,17 +85,16 @@ public void setUp() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(MAX_GET_TASK_REQUEST_RETRY)
Stream.of(TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());

// Set max request retry setting to 1 to limit sleeping the thread to one retry iteration
Settings testMaxRetrySetting = Settings.builder().put(MAX_GET_TASK_REQUEST_RETRY.getKey(), 1).build();
Settings testMaxRetrySetting = Settings.builder().put(TASK_REQUEST_RETRY_DURATION.getKey(), 1).build();
ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

flowFrameworkSettings = mock(FlowFrameworkSettings.class);
when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true);
when(flowFrameworkSettings.getMaxRetry()).thenReturn(5);

testThreadPool = new TestThreadPool(
DeployModelStepTests.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -80,17 +80,16 @@ public void setUp() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(MAX_GET_TASK_REQUEST_RETRY)
Stream.of(TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());

// Set max request retry setting to 1 to limit sleeping the thread to one retry iteration
Settings testMaxRetrySetting = Settings.builder().put(MAX_GET_TASK_REQUEST_RETRY.getKey(), 1).build();
Settings testMaxRetrySetting = Settings.builder().put(TASK_REQUEST_RETRY_DURATION.getKey(), 1).build();
ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

flowFrameworkSettings = mock(FlowFrameworkSettings.class);
when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true);
when(flowFrameworkSettings.getMaxRetry()).thenReturn(5);

testThreadPool = new TestThreadPool(
RegisterLocalModelStepTests.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
Expand Down Expand Up @@ -98,7 +98,7 @@ public static void setup() throws IOException {
Settings settings = Settings.builder().put("plugins.flow_framework.max_workflow_steps", 5).build();
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY)
Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION)
).collect(Collectors.toSet());
ClusterSettings clusterSettings = new ClusterSettings(settings, settingsSet);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Expand Down

0 comments on commit 0641695

Please sign in to comment.