Skip to content

Commit

Permalink
Move workflow timeout setting update consumer to settings class
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 8, 2024
1 parent 535d92c commit 1c5ded7
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkSettings flowFrameworkSettings;

private ClusterService clusterService;

/**
* Instantiate this plugin.
*/
Expand All @@ -106,7 +104,6 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
Expand Down Expand Up @@ -141,7 +138,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkflowAction(flowFrameworkSettings, settings, clusterService),
new RestCreateWorkflowAction(flowFrameworkSettings),
new RestDeleteWorkflowAction(flowFrameworkSettings),
new RestProvisionWorkflowAction(flowFrameworkSettings),
new RestDeprovisionWorkflowAction(flowFrameworkSettings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class FlowFrameworkSettings {
private volatile Integer maxWorkflowSteps;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;
/** Timeout for internal requests*/
protected volatile TimeValue requestTimeout;

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
Expand Down Expand Up @@ -86,10 +88,12 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) {
this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings);
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
}

/**
Expand Down Expand Up @@ -123,4 +127,12 @@ public Integer getMaxWorkflowSteps() {
public Integer getMaxWorkflows() {
return maxWorkflows;
}

/**
* Getter for request timeout
* @return request timeout
*/
public TimeValue getRequestTimeout() {
return requestTimeout;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -23,6 +21,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -40,7 +39,7 @@
/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends AbstractWorkflowAction {
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";
Expand All @@ -50,11 +49,8 @@ public class RestCreateWorkflowAction extends AbstractWorkflowAction {
/**
* Instantiates a new RestCreateWorkflowAction
* @param flowFrameworkSettings The settings for the flow framework plugin
* @param settings Environment settings
* @param clusterService clusterService
*/
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings, Settings settings, ClusterService clusterService) {
super(settings, clusterService);
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings) {
this.flowFrameworkSettings = flowFrameworkSettings;
}

Expand Down Expand Up @@ -92,7 +88,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision, requestTimeout);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,94 +116,104 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

if (request.getWorkflowId() == null) {
// Throttle incoming requests
checkMaxWorkflows(request.getRequestTimeout(), flowFrameworkSettings.getMaxWorkflows(), ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + flowFrameworkSettings.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;
} else {
// Initialize config index and create new global context and state index entries
flowFrameworkIndicesHandler.initializeConfigIndex(ActionListener.wrap(isInitialized -> {
if (!isInitialized) {
listener.onFailure(
new FlowFrameworkException("Failed to initalize config index", RestStatus.INTERNAL_SERVER_ERROR)
);
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(
templateWithUser,
ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
if (request.isProvision()) {
logger.info("provision parameter");
WorkflowRequest workflowRequest = new WorkflowRequest(globalContextResponse.getId(), null);
client.execute(
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
listener.onResponse(new WorkflowResponse(provisionResponse.getWorkflowId()));
}, exception -> {
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)
);
}
logger.error("Failed to send back provision workflow exception", exception);
})
);
} else {
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)
);
}
})
);
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
checkMaxWorkflows(
flowFrameworkSettings.getRequestTimeout(),
flowFrameworkSettings.getMaxWorkflows(),
ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + flowFrameworkSettings.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;
} else {
// Initialize config index and create new global context and state index entries
flowFrameworkIndicesHandler.initializeConfigIndex(ActionListener.wrap(isInitialized -> {
if (!isInitialized) {
listener.onFailure(
new FlowFrameworkException("Failed to initalize config index", RestStatus.INTERNAL_SERVER_ERROR)
);
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(
templateWithUser,
ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
if (request.isProvision()) {
logger.info("provision parameter");
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null
);
client.execute(
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
listener.onResponse(new WorkflowResponse(provisionResponse.getWorkflowId()));
}, exception -> {
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(
exception.getMessage(),
RestStatus.BAD_REQUEST
)
);
}
logger.error("Failed to send back provision workflow exception", exception);
})
);
} else {
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)
);
}
})
);
}
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}

})
);
}
}, exception -> {
logger.error("Failed to initialize config index : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
})
);
}
}, exception -> {
logger.error("Failed to initialize config index : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}

}));
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}));
}));
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
})
);
} else {
// Update existing entry, full document replacement
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
Expand Down
Loading

0 comments on commit 1c5ded7

Please sign in to comment.