Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented throttling for max workflows to be created #151

Merged
merged 7 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@

import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;
private ClusterService clusterService;

/**
* Instantiate this plugin.
Expand All @@ -84,6 +88,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
Expand All @@ -106,7 +111,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
Expand All @@ -123,7 +128,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = ImmutableList.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED);
List<Setting<?>> settings = ImmutableList.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT);
return settings;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,15 @@
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Controls enabling or disabling features of this plugin
*/
public class FlowFrameworkFeatureEnabledSetting {

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile Boolean isFlowFrameworkEnabled;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

/** The common settings of flow framework */
public class FlowFrameworkSettings {

private FlowFrameworkSettings() {}

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;

/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
1000,
0,
MAX_WORKFLOWS_LIMIT,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,23 @@
import java.util.List;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

/**
* Abstract class to handle search request.
*/
public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> extends BaseRestHandler {

/** Url Paths of the routes*/
protected final List<String> urlPaths;
/** Index on search operation needs to be performed*/
protected final String index;
/** Search class name*/
protected final Class<T> clazz;
/** Search action type*/
protected final ActionType<SearchResponse> actionType;
/** Settings to enable FlowFramework API*/
protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BaseRestHandler;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
* Abstract action for the rest actions
*/
public abstract class AbstractWorkflowAction extends BaseRestHandler {
/** Timeout for the request*/
protected volatile TimeValue requestTimeout;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;

/**
* Instantiates a new AbstractWorkflowAction
*
* @param settings Environment settings
* @param clusterService clusterService
*/
public AbstractWorkflowAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -21,7 +23,6 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -32,12 +33,12 @@
import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends BaseRestHandler {
public class RestCreateWorkflowAction extends AbstractWorkflowAction {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";
Expand All @@ -48,8 +49,15 @@
* Instantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param settings Environment settings
* @param clusterService clusterService
*/
public RestCreateWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
public RestCreateWorkflowAction(
FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting,
Settings settings,
ClusterService clusterService
) {
super(settings, clusterService);
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

Expand Down Expand Up @@ -85,7 +93,7 @@
Template template = Template.parse(request.content().utf8ToString());
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun, requestTimeout, maxWorkflows);

Check warning on line 96 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L96

Added line #L96 was not covered by tests

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template
Expand Down Expand Up @@ -84,7 +84,7 @@
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, null, null);

Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L87

Added line #L87 was not covered by tests
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Loading