diff --git a/CHANGELOG.md b/CHANGELOG.md index 48d5d819a..25523641d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) - Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538)) - Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551)) - Enable Flow Framework by default ([#553](https://github.com/opensearch-project/flow-framework/pull/553)) +- Adding new exception type for workflow step failures ([#577](https://github.com/opensearch-project/flow-framework/pull/577)) ### Bug Fixes ### Infrastructure diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java index 7e8aefc15..8d8ec4850 100644 --- a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte private static final long serialVersionUID = 1L; /** The rest status code of this exception */ - private final RestStatus restStatus; + protected final RestStatus restStatus; /** * Constructor with error message. diff --git a/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java new file mode 100644 index 000000000..3575034fc --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Representation of an exception that is caused by a workflow step failing outside of our plugin + * This is caught by an external client (e.g. ml-client) returning the failure + */ +public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject { + + private static final long serialVersionUID = 1L; + + /** + * Constructor with error message. + * + * @param message message of the exception + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(String message, RestStatus restStatus) { + super(message, restStatus); + } + + /** + * Constructor with specified cause. + * @param cause exception cause + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(Throwable cause, RestStatus restStatus) { + super(cause, restStatus); + } + + /** + * Constructor with specified error message adn cause. + * @param message error message + * @param cause exception cause + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) { + super(message, cause, restStatus); + } + + /** + * Getter for restStatus. + * + * @return the HTTP status code associated with the exception + */ + public RestStatus getRestStatus() { + return restStatus; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field("error", this.getMessage()).endObject(); + } +} diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 7189d8962..c2ec444c4 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -182,7 +182,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java index 0a6253896..fc18f103d 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -86,7 +86,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java index 8919c44e7..08c8d6722 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -66,8 +66,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request } // Validate content if (request.hasContent()) { - // BaseRestHandler will give appropriate error message - return channel -> channel.sendResponse(null); + throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST); } // Validate params if (workflowId == null) { @@ -82,7 +81,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java index aadf8ef8f..45fe9921a 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java @@ -84,7 +84,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java index f861efa2e..b9a6deb38 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java @@ -83,7 +83,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java index 26070f515..6ae56905c 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java @@ -94,7 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 205b749f9..18ca73f26 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -256,10 +256,18 @@ private void executeWorkflow(List workflowSequence, String workflow }, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); }) ); } catch (Exception ex) { + RestStatus status; + if (ex instanceof FlowFrameworkException) { + status = ((FlowFrameworkException) ex).getRestStatus(); + } else { + status = ExceptionsHelper.status(ex); + } logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex); String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName()) + " during step " - + currentStepId; + + currentStepId + + ", restStatus: " + + status.toString(); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java index b00931b51..0689ce4e4 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java @@ -22,6 +22,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; @@ -140,7 +141,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { public void onFailure(Exception e) { String errorMessage = "Failed step " + pipelineToBeCreated; logger.error(errorMessage, e); - createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 913f94e75..442e41355 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -214,7 +215,7 @@ public PlainActionFuture execute( }, exception -> { String errorMessage = "Failed to register local model in step " + currentNodeId; logger.error(errorMessage, exception); - registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception))); })); } catch (FlowFrameworkException e) { registerLocalModelFuture.onFailure(e); diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index 9c4fa75b4..c1933b0c4 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -18,6 +18,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.MLTask; @@ -127,7 +128,7 @@ protected void retryableGetMlTask( }, exception -> { String errorMessage = workflowStep + " failed"; logger.error(errorMessage, exception); - mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST)); })); try { Thread.sleep(this.retryDuration.getMillis()); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index b26b28a6b..02f5bf336 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to create connector"; logger.error(errorMessage, e); - createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java index 58e5f789a..0dba99f7a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete agent " + agentId; logger.error(errorMessage, e); - deleteAgentFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index ba411c0a2..11a2b2d62 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete connector " + connectorId; logger.error(errorMessage, e); - deleteConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java index 9dab3365e..c8071f7cd 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -85,7 +86,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete model " + modelId; logger.error(errorMessage, e); - deleteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index 3d29e37ca..929f5f570 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -117,7 +118,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to deploy model " + modelId; logger.error(errorMessage, e); - deployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java index 46b120bde..8042d5244 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java @@ -15,6 +15,7 @@ import org.opensearch.common.Nullable; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -135,7 +136,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to register the agent"; logger.error(errorMessage, e); - registerAgentModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 19818e54f..b8a79325a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -118,7 +119,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse public void onFailure(Exception e) { String errorMessage = "Failed to register model group"; logger.error(errorMessage, e); - registerModelGroupFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index 51081b467..c32a7f0bd 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.update.UpdateResponse; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -177,7 +178,7 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi public void onFailure(Exception e) { String errorMessage = "Failed to register remote model"; logger.error(errorMessage, e); - registerRemoteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); diff --git a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java index 034bd3caa..00eec6d29 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java @@ -16,6 +16,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsResponse; @@ -98,7 +99,7 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to undeploy model " + modelId; logger.error(errorMessage, e); - undeployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 326d382ee..4acc80a0a 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -593,7 +593,21 @@ protected void getAndAssertWorkflowStatus( Map responseMap = entityAsMap(response); assertEquals(stateStatus.name(), (String) responseMap.get(CommonValue.STATE_FIELD)); assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD)); + } + + /** + * Helper method to invoke the Get Workflow status Rest Action and get the error field + * @param client the rest client + * @param workflowId the workflow ID to get the status + * @return the error string + * @throws Exception if the request fails + */ + protected String getAndWorkflowStatusError(RestClient client, String workflowId) throws Exception { + Response response = getWorkflowStatus(client, workflowId, true); + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + return (String) responseMap.get(CommonValue.ERROR_FIELD); } /** diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 8db37d83d..fb0ee879b 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -401,9 +401,6 @@ public void testCreateAndProvisionIngestAndSearchPipeline() throws Exception { public void testDefaultCohereUseCase() throws Exception { - // Using a 3 step template to create a connector, register remote model and deploy model - Template template = TestHelpers.createTemplateFromFile("ingest-search-pipeline-template.json"); - // Hit Create Workflow API with original template Response response = createWorkflowWithUseCase(client(), "cohere-embedding_model_deploy"); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); @@ -440,4 +437,35 @@ public void testDefaultCohereUseCase() throws Exception { assertEquals(3, resourcesCreated.size()); } + public void testDefaultSemanticSearchUseCaseWithFailureExpected() throws Exception { + + // Hit Create Workflow API with original template + Response response = createWorkflowWithUseCase(client(), "semantic_search"); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); + + // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = provisionWorkflow(client(), workflowId); + } else { + response = provisionWorkflow(client(), workflowId); + } + + // expecting a failure since there is no neural-search plugin in cluster to provide text-embedding processor + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + getAndAssertWorkflowStatus(client(), workflowId, State.FAILED, ProvisioningProgress.FAILED); + + String error = getAndWorkflowStatusError(client(), workflowId); + assertTrue( + error.contains( + "org.opensearch.flowframework.exception.WorkflowStepException during step create_ingest_pipeline, restStatus: BAD_REQUEST" + ) + ); + + } + }