Skip to content

Commit

Permalink
Change max retries to retry duration, refactor settings for consisten…
Browse files Browse the repository at this point in the history
…cy (#381)

* Change max retries to retry duration

Signed-off-by: Daniel Widdis <[email protected]>

* Move max workflows setting update consumer to settings class

Signed-off-by: Daniel Widdis <[email protected]>

* Move workflow timeout setting update consumer to settings class

Signed-off-by: Daniel Widdis <[email protected]>

* Use timeout in other search requests

Signed-off-by: Daniel Widdis <[email protected]>

* Improve test coverage

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit a6fb532)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jan 9, 2024
1 parent 9616443 commit a66f427
Show file tree
Hide file tree
Showing 18 changed files with 222 additions and 434 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
Expand All @@ -84,8 +84,6 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkSettings flowFrameworkSettings;

private ClusterService clusterService;

/**
* Instantiate this plugin.
*/
Expand All @@ -106,7 +104,6 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
Expand All @@ -127,7 +124,7 @@ public Collection<Object> createComponents(
flowFrameworkSettings
);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler);
return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
}

@Override
Expand All @@ -141,7 +138,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkflowAction(flowFrameworkSettings, settings, clusterService),
new RestCreateWorkflowAction(flowFrameworkSettings),
new RestDeleteWorkflowAction(flowFrameworkSettings),
new RestProvisionWorkflowAction(flowFrameworkSettings),
new RestDeprovisionWorkflowAction(flowFrameworkSettings),
Expand All @@ -168,7 +165,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY);
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
public class FlowFrameworkSettings {

private volatile Boolean isFlowFrameworkEnabled;
/** The maximum number of transport request retries */
private volatile Integer maxRetry;
/** Max workflow steps that can be created*/
/** The duration between request retries */
private volatile TimeValue retryDuration;
/** Max workflow steps that can be created */
private volatile Integer maxWorkflowSteps;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;
/** Timeout for internal requests*/
protected volatile TimeValue requestTimeout;

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
/** The upper limit of max workflow steps that can be in a single workflow */
public static final int MAX_WORKFLOW_STEPS_LIMIT = 500;

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
1000,
Expand All @@ -37,7 +41,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOW_STEPS = Setting.intSetting(
"plugins.flow_framework.max_workflow_steps",
50,
Expand All @@ -47,7 +51,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
Expand All @@ -63,11 +67,10 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the maximum number of get task request retries */
public static final Setting<Integer> MAX_GET_TASK_REQUEST_RETRY = Setting.intSetting(
"plugins.flow_framework.max_get_task_request_retry",
5,
0,
/** This setting sets the time between task request retries */
public static final Setting<TimeValue> TASK_REQUEST_RETRY_DURATION = Setting.positiveTimeSetting(
"plugins.flow_framework.task_request_retry_duration",
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -82,27 +85,31 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) {
// Currently this is just an on/off switch for the entire plugin's API.
// If desired more fine-tuned feature settings can be added below.
this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings);
this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings);
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
}

/**
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble.
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be available.
* @return whether Flow Framework is enabled.
*/
public boolean isFlowFrameworkEnabled() {
return isFlowFrameworkEnabled;
}

/**
* Getter for max retry count
* @return count of max retry
* Getter for retry duration
* @return retry duration
*/
public Integer getMaxRetry() {
return maxRetry;
public TimeValue getRetryDuration() {
return retryDuration;
}

/**
Expand All @@ -112,4 +119,20 @@ public Integer getMaxRetry() {
public Integer getMaxWorkflowSteps() {
return maxWorkflowSteps;
}

/**
* Getter for max workflows
* @return max workflows
*/
public Integer getMaxWorkflows() {
return maxWorkflows;
}

/**
* Getter for request timeout
* @return request timeout
*/
public TimeValue getRequestTimeout() {
return requestTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,33 @@ public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> e
/** Search action type*/
protected final ActionType<SearchResponse> actionType;
/** Settings to enable FlowFramework API*/
protected final FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
protected final FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new AbstractSearchWorkflowAction
* @param urlPaths urlPaths to create routes
* @param index index the search should be done on
* @param clazz model class
* @param actionType from which action abstract class is called
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param flowFrameworkSettings Whether this API is enabled
*/
public AbstractSearchWorkflowAction(
List<String> urlPaths,
String index,
Class<T> clazz,
ActionType<SearchResponse> actionType,
FlowFrameworkSettings flowFrameworkFeatureEnabledSetting
FlowFrameworkSettings flowFrameworkSettings
) {
this.urlPaths = urlPaths;
this.index = index;
this.clazz = clazz;
this.actionType = actionType;
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
Expand All @@ -87,6 +87,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request, searchSourceBuilder));
searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true);
searchSourceBuilder.timeout(flowFrameworkSettings.getRequestTimeout());
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index);

Check warning on line 91 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L90-L91

Added lines #L90 - L91 were not covered by tests
return channel -> client.execute(actionType, searchRequest, search(channel));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -23,6 +21,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -40,26 +39,19 @@
/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends AbstractWorkflowAction {
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
private FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new RestCreateWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param settings Environment settings
* @param clusterService clusterService
* @param flowFrameworkSettings The settings for the flow framework plugin
*/
public RestCreateWorkflowAction(
FlowFrameworkSettings flowFrameworkFeatureEnabledSetting,
Settings settings,
ClusterService clusterService
) {
super(settings, clusterService);
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings) {
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
Expand All @@ -80,7 +72,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
Expand All @@ -96,14 +88,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
requestTimeout,
maxWorkflows
);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Loading

0 comments on commit a66f427

Please sign in to comment.