From a8e7ff590c9c195f857396b9be5023a87551dac1 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Fri, 5 Jan 2024 09:30:38 -0800 Subject: [PATCH] Added a common settings class to initialize plugin settings (#361) * Moved settings to a common singleton class Signed-off-by: Owais Kazi Signed-off-by: owaiskazi19 * Removed update retry settings Signed-off-by: Owais Kazi * Used getter for maxRetry value Signed-off-by: Owais Kazi Signed-off-by: owaiskazi19 * Removed FeatureEnabledSettings file Signed-off-by: Owais Kazi * Fixed tests Signed-off-by: owaiskazi19 * Moved max workflow step setting to common class Signed-off-by: Owais Kazi --------- Signed-off-by: Owais Kazi Signed-off-by: owaiskazi19 --- .../flowframework/FlowFrameworkPlugin.java | 28 +++++------ .../FlowFrameworkFeatureEnabledSetting.java | 43 ---------------- .../common/FlowFrameworkSettings.java | 49 ++++++++++++++++++- .../rest/AbstractSearchWorkflowAction.java | 6 +-- .../rest/RestCreateWorkflowAction.java | 6 +-- .../rest/RestDeleteWorkflowAction.java | 6 +-- .../rest/RestDeprovisionWorkflowAction.java | 6 +-- .../rest/RestGetWorkflowAction.java | 6 +-- .../rest/RestGetWorkflowStateAction.java | 6 +-- .../rest/RestProvisionWorkflowAction.java | 6 +-- .../rest/RestSearchWorkflowAction.java | 4 +- .../rest/RestSearchWorkflowStateAction.java | 4 +- .../AbstractRetryableWorkflowStep.java | 15 ++---- .../workflow/DeployModelStep.java | 13 ++--- .../workflow/RegisterLocalModelStep.java | 13 ++--- .../workflow/WorkflowProcessSorter.java | 9 ++-- .../workflow/WorkflowStepFactory.java | 12 ++--- ...s.java => FlowFrameworkSettingsTests.java} | 18 ++++--- .../model/WorkflowValidatorTests.java | 9 +++- .../rest/RestCreateWorkflowActionTests.java | 6 +-- .../rest/RestDeleteWorkflowActionTests.java | 6 +-- .../RestDeprovisionWorkflowActionTests.java | 6 +-- .../rest/RestGetWorkflowActionTests.java | 6 +-- .../rest/RestGetWorkflowStateActionTests.java | 6 +-- .../RestProvisionWorkflowActionTests.java | 6 +-- .../rest/RestSearchWorkflowActionTests.java | 6 +-- .../RestSearchWorkflowStateActionTests.java | 6 +-- .../workflow/DeployModelStepTests.java | 11 +++-- .../workflow/RegisterLocalModelStepTests.java | 11 +++-- .../workflow/WorkflowProcessSorterTests.java | 11 +++-- 30 files changed, 179 insertions(+), 161 deletions(-) delete mode 100644 src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java rename src/test/java/org/opensearch/flowframework/common/{FlowFrameworkFeatureEnabledSettingTests.java => FlowFrameworkSettingsTests.java} (67%) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index f7c9d2e00..5cab4e17c 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -24,7 +24,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; import org.opensearch.flowframework.rest.RestDeleteWorkflowAction; @@ -82,7 +82,7 @@ */ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin { - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkSettings; private ClusterService clusterService; @@ -107,24 +107,24 @@ public Collection createComponents( ) { Settings settings = environment.settings(); this.clusterService = clusterService; - flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings); + flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings); MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client); FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService, encryptorUtils); WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory( - settings, threadPool, clusterService, client, mlClient, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter( workflowStepFactory, threadPool, clusterService, client, - settings + flowFrameworkSettings ); return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler); @@ -141,14 +141,14 @@ public List getRestHandlers( Supplier nodesInCluster ) { return List.of( - new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService), - new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting), - new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestSearchWorkflowStateAction(flowFrameworkFeatureEnabledSetting) + new RestCreateWorkflowAction(flowFrameworkSettings, settings, clusterService), + new RestDeleteWorkflowAction(flowFrameworkSettings), + new RestProvisionWorkflowAction(flowFrameworkSettings), + new RestDeprovisionWorkflowAction(flowFrameworkSettings), + new RestSearchWorkflowAction(flowFrameworkSettings), + new RestGetWorkflowStateAction(flowFrameworkSettings), + new RestGetWorkflowAction(flowFrameworkSettings), + new RestSearchWorkflowStateAction(flowFrameworkSettings) ); } diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java deleted file mode 100644 index 87f5412a8..000000000 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.flowframework.common; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; - -import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; - -/** - * Controls enabling or disabling features of this plugin - */ -public class FlowFrameworkFeatureEnabledSetting { - - private volatile Boolean isFlowFrameworkEnabled; - - /** - * Instantiate this class. - * - * @param clusterService OpenSearch cluster service - * @param settings OpenSearch settings - */ - public FlowFrameworkFeatureEnabledSetting(ClusterService clusterService, Settings settings) { - // Currently this is just an on/off switch for the entire plugin's API. - // If desired more fine-tuned feature settings can be added below. - isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it); - } - - /** - * Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble. - * @return whether Flow Framework is enabled. - */ - public boolean isFlowFrameworkEnabled() { - return isFlowFrameworkEnabled; - } -} diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java index 536fa2c73..d5bec1871 100644 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java @@ -8,13 +8,19 @@ */ package org.opensearch.flowframework.common; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; /** The common settings of flow framework */ public class FlowFrameworkSettings { - private FlowFrameworkSettings() {} + private volatile Boolean isFlowFrameworkEnabled; + /** The maximum number of transport request retries */ + private volatile Integer maxRetry; + /** Max workflow steps that can be created*/ + private volatile Integer maxWorkflowSteps; /** The upper limit of max workflows that can be created */ public static final int MAX_WORKFLOWS_LIMIT = 10000; @@ -65,4 +71,45 @@ private FlowFrameworkSettings() {} Setting.Property.NodeScope, Setting.Property.Dynamic ); + + /** + * Instantiate this class. + * + * @param clusterService OpenSearch cluster service + * @param settings OpenSearch settings + */ + public FlowFrameworkSettings(ClusterService clusterService, Settings settings) { + // Currently this is just an on/off switch for the entire plugin's API. + // If desired more fine-tuned feature settings can be added below. + this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings); + this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings); + this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it); + } + + /** + * Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble. + * @return whether Flow Framework is enabled. + */ + public boolean isFlowFrameworkEnabled() { + return isFlowFrameworkEnabled; + } + + /** + * Getter for max retry count + * @return count of max retry + */ + public Integer getMaxRetry() { + return maxRetry; + } + + /** + * Getter for max workflow steps + * @return count of steps + */ + public Integer getMaxWorkflowSteps() { + return maxWorkflowSteps; + } } diff --git a/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java index 65ab14de0..e5ad65c43 100644 --- a/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java @@ -15,7 +15,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; @@ -48,7 +48,7 @@ public abstract class AbstractSearchWorkflowAction e /** Search action type*/ protected final ActionType actionType; /** Settings to enable FlowFramework API*/ - protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + protected final FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new AbstractSearchWorkflowAction @@ -63,7 +63,7 @@ public AbstractSearchWorkflowAction( String index, Class clazz, ActionType actionType, - FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting + FlowFrameworkSettings flowFrameworkFeatureEnabledSetting ) { this.urlPaths = urlPaths; this.index = index; diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 46de0cfb8..6a0558e89 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -18,7 +18,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.transport.CreateWorkflowAction; @@ -45,7 +45,7 @@ public class RestCreateWorkflowAction extends AbstractWorkflowAction { private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class); private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action"; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestCreateWorkflowAction @@ -54,7 +54,7 @@ public class RestCreateWorkflowAction extends AbstractWorkflowAction { * @param clusterService clusterService */ public RestCreateWorkflowAction( - FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting, + FlowFrameworkSettings flowFrameworkFeatureEnabledSetting, Settings settings, ClusterService clusterService ) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java index 249c317d4..d4fcb824b 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -16,7 +16,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.DeleteWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; @@ -39,13 +39,13 @@ public class RestDeleteWorkflowAction extends BaseRestHandler { private static final String DELETE_WORKFLOW_ACTION = "delete_workflow"; private static final Logger logger = LogManager.getLogger(RestDeleteWorkflowAction.class); - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestDeleteWorkflowAction * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ - public RestDeleteWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestDeleteWorkflowAction(FlowFrameworkSettings flowFrameworkFeatureEnabledSetting) { this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java index 171e85335..b3dc5e713 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -16,7 +16,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; @@ -39,13 +39,13 @@ public class RestDeprovisionWorkflowAction extends BaseRestHandler { private static final String DEPROVISION_WORKFLOW_ACTION = "deprovision_workflow"; private static final Logger logger = LogManager.getLogger(RestDeprovisionWorkflowAction.class); - private final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private final FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestDeprovisionWorkflowAction * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ - public RestDeprovisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestDeprovisionWorkflowAction(FlowFrameworkSettings flowFrameworkFeatureEnabledSetting) { this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java index f24febbe2..d1097fb68 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java @@ -16,7 +16,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.GetWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; @@ -39,13 +39,13 @@ public class RestGetWorkflowAction extends BaseRestHandler { private static final String GET_WORKFLOW_ACTION = "get_workflow"; private static final Logger logger = LogManager.getLogger(RestGetWorkflowAction.class); - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestGetWorkflowAction * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ - public RestGetWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestGetWorkflowAction(FlowFrameworkSettings flowFrameworkFeatureEnabledSetting) { this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java index 356d25aab..9db927f34 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java @@ -16,7 +16,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.GetWorkflowStateAction; import org.opensearch.flowframework.transport.GetWorkflowStateRequest; @@ -39,13 +39,13 @@ public class RestGetWorkflowStateAction extends BaseRestHandler { private static final String GET_WORKFLOW_STATE_ACTION = "get_workflow_state"; private static final Logger logger = LogManager.getLogger(RestGetWorkflowStateAction.class); - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestGetWorkflowStateAction * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ - public RestGetWorkflowStateAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestGetWorkflowStateAction(FlowFrameworkSettings flowFrameworkFeatureEnabledSetting) { this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java index a07ceaf54..aad82766d 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java @@ -15,7 +15,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.ProvisionWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; @@ -40,14 +40,14 @@ public class RestProvisionWorkflowAction extends BaseRestHandler { private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action"; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; /** * Instantiates a new RestProvisionWorkflowAction * * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ - public RestProvisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestProvisionWorkflowAction(FlowFrameworkSettings flowFrameworkFeatureEnabledSetting) { this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java index ed4799814..e513d40fd 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java @@ -8,7 +8,7 @@ */ package org.opensearch.flowframework.rest; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.transport.SearchWorkflowAction; @@ -30,7 +30,7 @@ public class RestSearchWorkflowAction extends AbstractSearchWorkflowAction maxRetry = it); + this.maxRetry = flowFrameworkSettings.getMaxRetry(); this.mlClient = mlClient; this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index 21f1eaca5..c7c38c30f 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -11,9 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; @@ -43,20 +42,18 @@ public class DeployModelStep extends AbstractRetryableWorkflowStep { /** * Instantiate this class - * @param settings The OpenSearch settings * @param threadPool The OpenSearch thread pool - * @param clusterService The cluster service * @param mlClient client to instantiate MLClient * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + * @param flowFrameworkSettings settings of flow framework */ public DeployModelStep( - Settings settings, ThreadPool threadPool, - ClusterService clusterService, MachineLearningNodeClient mlClient, - FlowFrameworkIndicesHandler flowFrameworkIndicesHandler + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler, + FlowFrameworkSettings flowFrameworkSettings ) { - super(settings, threadPool, clusterService, mlClient, flowFrameworkIndicesHandler); + super(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings); this.mlClient = mlClient; this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java index 103a6d643..4213d99cd 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java @@ -11,9 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; @@ -63,20 +62,18 @@ public class RegisterLocalModelStep extends AbstractRetryableWorkflowStep { /** * Instantiate this class - * @param settings The OpenSearch settings * @param threadPool The OpenSearch thread pool - * @param clusterService The cluster service * @param mlClient client to instantiate MLClient * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + * @param flowFrameworkSettings settings of flow framework */ public RegisterLocalModelStep( - Settings settings, ThreadPool threadPool, - ClusterService clusterService, MachineLearningNodeClient mlClient, - FlowFrameworkIndicesHandler flowFrameworkIndicesHandler + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler, + FlowFrameworkSettings flowFrameworkSettings ) { - super(settings, threadPool, clusterService, mlClient, flowFrameworkIndicesHandler); + super(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings); this.mlClient = mlClient; this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 46a895beb..43a645b60 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -15,10 +15,10 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowEdge; @@ -67,19 +67,18 @@ public class WorkflowProcessSorter { * @param threadPool The OpenSearch Thread pool to pass to process nodes. * @param clusterService The OpenSearch cluster service. * @param client The OpenSearch Client - * @param settings OpenSerch settings + * @param flowFrameworkSettings settings of the plugin */ public WorkflowProcessSorter( WorkflowStepFactory workflowStepFactory, ThreadPool threadPool, ClusterService clusterService, Client client, - Settings settings + FlowFrameworkSettings flowFrameworkSettings ) { this.workflowStepFactory = workflowStepFactory; this.threadPool = threadPool; - this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it); + this.maxWorkflowSteps = flowFrameworkSettings.getMaxWorkflowSteps(); this.clusterService = clusterService; this.client = client; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 14a9d5dbd..839b8e9b8 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -10,8 +10,8 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -31,33 +31,33 @@ public class WorkflowStepFactory { /** * Instantiate this class. * - * @param settings The OpenSearch settings * @param threadPool The OpenSearch thread pool * @param clusterService The OpenSearch cluster service * @param client The OpenSearch client steps can use * @param mlClient Machine Learning client to perform ml operations * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + * @param flowFrameworkSettings common settings of the plugin */ public WorkflowStepFactory( - Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, MachineLearningNodeClient mlClient, - FlowFrameworkIndicesHandler flowFrameworkIndicesHandler + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler, + FlowFrameworkSettings flowFrameworkSettings ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(clusterService, client, flowFrameworkIndicesHandler)); stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalModelStep.NAME, - () -> new RegisterLocalModelStep(settings, threadPool, clusterService, mlClient, flowFrameworkIndicesHandler) + () -> new RegisterLocalModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) ); stepMap.put(RegisterRemoteModelStep.NAME, () -> new RegisterRemoteModelStep(mlClient, flowFrameworkIndicesHandler)); stepMap.put(DeleteModelStep.NAME, () -> new DeleteModelStep(mlClient)); stepMap.put( DeployModelStep.NAME, - () -> new DeployModelStep(settings, threadPool, clusterService, mlClient, flowFrameworkIndicesHandler) + () -> new DeployModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) ); stepMap.put(UndeployModelStep.NAME, () -> new UndeployModelStep(mlClient)); stepMap.put(CreateConnectorStep.NAME, () -> new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler)); diff --git a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java similarity index 67% rename from src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java rename to src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java index 232dd71f2..04754f274 100644 --- a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java +++ b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkSettingsTests.java @@ -15,6 +15,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -22,13 +23,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class FlowFrameworkFeatureEnabledSettingTests extends OpenSearchTestCase { - +public class FlowFrameworkSettingsTests extends OpenSearchTestCase { private Settings settings; private ClusterSettings clusterSettings; private ClusterService clusterService; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkSettings; @Override public void setUp() throws Exception { @@ -37,12 +37,16 @@ public void setUp() throws Exception { settings = Settings.builder().build(); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED) + Stream.of( + FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED, + FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY, + FlowFrameworkSettings.MAX_WORKFLOW_STEPS + ) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings); + flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings); } @Override @@ -51,6 +55,8 @@ public void tearDown() throws Exception { } public void testSettings() throws IOException { - assertFalse(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()); + assertFalse(flowFrameworkSettings.isFlowFrameworkEnabled()); + assertEquals(Optional.of(5), Optional.ofNullable(flowFrameworkSettings.getMaxRetry())); + assertEquals(Optional.of(50), Optional.ofNullable(flowFrameworkSettings.getMaxWorkflowSteps())); } } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 73ddf0349..de5ae440b 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -40,6 +41,7 @@ public class WorkflowValidatorTests extends OpenSearchTestCase { private String validWorkflowStepJson; private String invalidWorkflowStepJson; + private FlowFrameworkSettings flowFrameworkSettings; @Override public void setUp() throws Exception { @@ -48,6 +50,9 @@ public void setUp() throws Exception { "{\"workflow_step_1\":{\"inputs\":[\"input_1\",\"input_2\"],\"outputs\":[\"output_1\"]},\"workflow_step_2\":{\"inputs\":[\"input_1\",\"input_2\",\"input_3\"],\"outputs\":[\"output_1\",\"output_2\",\"output_3\"]}}"; invalidWorkflowStepJson = "{\"workflow_step_1\":{\"bad_field\":[\"input_1\",\"input_2\"],\"outputs\":[\"output_1\"]},\"workflow_step_2\":{\"inputs\":[\"input_1\",\"input_2\",\"input_3\"],\"outputs\":[\"output_1\",\"output_2\",\"output_3\"]}}"; + + flowFrameworkSettings = mock(FlowFrameworkSettings.class); + when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); } public void testParseWorkflowValidator() throws IOException { @@ -90,12 +95,12 @@ public void testWorkflowStepFactoryHasValidators() throws IOException { when(clusterService.getClusterSettings()).thenReturn(clusterSettings); WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory( - Settings.EMPTY, threadPool, clusterService, client, mlClient, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); // Read in workflow-steps.json diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index 92f1bcafc..818c38c00 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -18,7 +18,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.flowframework.TestHelpers; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowEdge; @@ -48,14 +48,14 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase { private String createWorkflowPath; private String updateWorkflowPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; private Settings settings; private ClusterService clusterService; @Override public void setUp() throws Exception { super.setUp(); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); settings = Settings.builder() .put(WORKFLOW_REQUEST_TIMEOUT.getKey(), TimeValue.timeValueMillis(10)) .put(MAX_WORKFLOWS.getKey(), 2) diff --git a/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java index 93fc27623..e52a2fb2a 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java @@ -12,7 +12,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -29,7 +29,7 @@ public class RestDeleteWorkflowActionTests extends OpenSearchTestCase { private RestDeleteWorkflowAction restDeleteWorkflowAction; private String getPath; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; private NodeClient nodeClient; @Override @@ -37,7 +37,7 @@ public void setUp() throws Exception { super.setUp(); this.getPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id"); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.restDeleteWorkflowAction = new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java index a9170e35d..cdf25a474 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java @@ -12,7 +12,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -31,12 +31,12 @@ public class RestDeprovisionWorkflowActionTests extends OpenSearchTestCase { private RestDeprovisionWorkflowAction deprovisionWorkflowRestAction; private String deprovisionWorkflowPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; @Override public void setUp() throws Exception { super.setUp(); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.deprovisionWorkflowRestAction = new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java index 31cfc701f..a34eab679 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java @@ -12,7 +12,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -29,7 +29,7 @@ public class RestGetWorkflowActionTests extends OpenSearchTestCase { private RestGetWorkflowAction restGetWorkflowAction; private String getPath; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; private NodeClient nodeClient; @Override @@ -37,7 +37,7 @@ public void setUp() throws Exception { super.setUp(); this.getPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id"); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.restGetWorkflowAction = new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowStateActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowStateActionTests.java index 06c4a7053..649309440 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowStateActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowStateActionTests.java @@ -12,7 +12,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -30,14 +30,14 @@ public class RestGetWorkflowStateActionTests extends OpenSearchTestCase { private RestGetWorkflowStateAction restGetWorkflowStateAction; private String getPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; @Override public void setUp() throws Exception { super.setUp(); this.getPath = String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, "workflow_id", "_status"); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.restGetWorkflowStateAction = new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java index d7bebb9a9..6ddd83d11 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java @@ -12,7 +12,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -31,12 +31,12 @@ public class RestProvisionWorkflowActionTests extends OpenSearchTestCase { private RestProvisionWorkflowAction provisionWorkflowRestAction; private String provisionWorkflowPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; @Override public void setUp() throws Exception { super.setUp(); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.provisionWorkflowRestAction = new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowActionTests.java index a92950534..f3579bbf6 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowActionTests.java @@ -13,7 +13,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentParseException; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -31,14 +31,14 @@ public class RestSearchWorkflowActionTests extends OpenSearchTestCase { private RestSearchWorkflowAction restSearchWorkflowAction; private String searchPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; @Override public void setUp() throws Exception { super.setUp(); this.searchPath = String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, "_search"); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.restSearchWorkflowAction = new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java index 028860831..7db5fabe8 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java @@ -13,7 +13,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentParseException; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -31,14 +31,14 @@ public class RestSearchWorkflowStateActionTests extends OpenSearchTestCase { private RestSearchWorkflowStateAction restSearchWorkflowStateAction; private String searchPath; private NodeClient nodeClient; - private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting; @Override public void setUp() throws Exception { super.setUp(); this.searchPath = String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, "state/_search"); - flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); this.restSearchWorkflowStateAction = new RestSearchWorkflowStateAction(flowFrameworkFeatureEnabledSetting); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index 10547d972..bc174a849 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -19,6 +19,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -73,6 +74,7 @@ public class DeployModelStepTests extends OpenSearchTestCase { private DeployModelStep deployModel; private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private FlowFrameworkSettings flowFrameworkSettings; @Override public void setUp() throws Exception { @@ -91,6 +93,10 @@ public void setUp() throws Exception { ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + flowFrameworkSettings = mock(FlowFrameworkSettings.class); + when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); + when(flowFrameworkSettings.getMaxRetry()).thenReturn(5); + testThreadPool = new TestThreadPool( DeployModelStepTests.class.getName(), new FixedExecutorBuilder( @@ -102,11 +108,10 @@ public void setUp() throws Exception { ) ); this.deployModel = new DeployModelStep( - testMaxRetrySetting, testThreadPool, - clusterService, machineLearningNodeClient, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); this.inputData = new WorkflowData(Map.ofEntries(Map.entry("model_id", "modelId")), "test-id", "test-node-id"); } diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java index 518b904e3..4c80d8f3a 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalModelStepTests.java @@ -18,6 +18,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -66,6 +67,7 @@ public class RegisterLocalModelStepTests extends OpenSearchTestCase { private RegisterLocalModelStep registerLocalModelStep; private WorkflowData workflowData; private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private FlowFrameworkSettings flowFrameworkSettings; @Mock MachineLearningNodeClient machineLearningNodeClient; @@ -86,6 +88,10 @@ public void setUp() throws Exception { ClusterSettings clusterSettings = new ClusterSettings(testMaxRetrySetting, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + flowFrameworkSettings = mock(FlowFrameworkSettings.class); + when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); + when(flowFrameworkSettings.getMaxRetry()).thenReturn(5); + testThreadPool = new TestThreadPool( RegisterLocalModelStepTests.class.getName(), new FixedExecutorBuilder( @@ -97,11 +103,10 @@ public void setUp() throws Exception { ) ); this.registerLocalModelStep = new RegisterLocalModelStep( - testMaxRetrySetting, testThreadPool, - clusterService, machineLearningNodeClient, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); this.workflowData = new WorkflowData( diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index bae7afd54..3dc1eab81 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -100,6 +100,7 @@ private static List parse(String json) throws IOException { private static Client client = mock(Client.class); private static ClusterService clusterService = mock(ClusterService.class); private static WorkflowValidator validator; + private static FlowFrameworkSettings flowFrameworkSettings; @BeforeClass public static void setup() throws IOException { @@ -115,6 +116,10 @@ public static void setup() throws IOException { ClusterSettings clusterSettings = new ClusterSettings(settings, settingsSet); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + flowFrameworkSettings = mock(FlowFrameworkSettings.class); + when(flowFrameworkSettings.isFlowFrameworkEnabled()).thenReturn(true); + when(flowFrameworkSettings.getMaxWorkflowSteps()).thenReturn(5); + when(client.admin()).thenReturn(adminClient); testThreadPool = new TestThreadPool( @@ -128,14 +133,14 @@ public static void setup() throws IOException { ) ); WorkflowStepFactory factory = new WorkflowStepFactory( - Settings.EMPTY, testThreadPool, clusterService, client, mlClient, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); - workflowProcessSorter = new WorkflowProcessSorter(factory, testThreadPool, clusterService, client, settings); + workflowProcessSorter = new WorkflowProcessSorter(factory, testThreadPool, clusterService, client, flowFrameworkSettings); validator = WorkflowValidator.parse("mappings/workflow-steps.json"); }