diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java index 7e8aefc15..8d8ec4850 100644 --- a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte private static final long serialVersionUID = 1L; /** The rest status code of this exception */ - private final RestStatus restStatus; + protected final RestStatus restStatus; /** * Constructor with error message. diff --git a/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java new file mode 100644 index 000000000..3575034fc --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Representation of an exception that is caused by a workflow step failing outside of our plugin + * This is caught by an external client (e.g. ml-client) returning the failure + */ +public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject { + + private static final long serialVersionUID = 1L; + + /** + * Constructor with error message. + * + * @param message message of the exception + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(String message, RestStatus restStatus) { + super(message, restStatus); + } + + /** + * Constructor with specified cause. + * @param cause exception cause + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(Throwable cause, RestStatus restStatus) { + super(cause, restStatus); + } + + /** + * Constructor with specified error message adn cause. + * @param message error message + * @param cause exception cause + * @param restStatus HTTP status code of the response + */ + public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) { + super(message, cause, restStatus); + } + + /** + * Getter for restStatus. + * + * @return the HTTP status code associated with the exception + */ + public RestStatus getRestStatus() { + return restStatus; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field("error", this.getMessage()).endObject(); + } +} diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 09fec81e1..d91f7aa35 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -19,6 +19,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; @@ -123,9 +124,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java index 0a6253896..7e1df1e45 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.DeleteWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; @@ -84,9 +85,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java index 8919c44e7..3d42b5db0 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; @@ -66,8 +67,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request } // Validate content if (request.hasContent()) { - // BaseRestHandler will give appropriate error message - return channel -> channel.sendResponse(null); + throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST); } // Validate params if (workflowId == null) { @@ -80,9 +80,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java index 0710657c2..0fec8855c 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.GetWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; @@ -85,9 +86,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java index aadf8ef8f..35478a60f 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.GetWorkflowStateAction; import org.opensearch.flowframework.transport.GetWorkflowStateRequest; import org.opensearch.rest.BaseRestHandler; @@ -82,9 +83,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java index f861efa2e..92e28de70 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.GetWorkflowStepAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; @@ -81,9 +82,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java index 26070f515..95a2022bc 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java @@ -19,6 +19,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.transport.ProvisionWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; @@ -92,9 +93,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = exception instanceof FlowFrameworkException - ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception)); + FlowFrameworkException ex; + if (exception instanceof WorkflowStepException) { + ex = (WorkflowStepException) exception; + } else if (exception instanceof FlowFrameworkException) { + ex = (FlowFrameworkException) exception; + } else { + ex = new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception)); + } XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 205b749f9..18ca73f26 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -256,10 +256,18 @@ private void executeWorkflow(List workflowSequence, String workflow }, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); }) ); } catch (Exception ex) { + RestStatus status; + if (ex instanceof FlowFrameworkException) { + status = ((FlowFrameworkException) ex).getRestStatus(); + } else { + status = ExceptionsHelper.status(ex); + } logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex); String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName()) + " during step " - + currentStepId; + + currentStepId + + ", restStatus: " + + status.toString(); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java index 14f51afa8..101c3ec12 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java @@ -22,6 +22,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; @@ -135,7 +136,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { public void onFailure(Exception e) { String errorMessage = "Failed step " + pipelineToBeCreated; logger.error(errorMessage, e); - createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 913f94e75..442e41355 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -214,7 +215,7 @@ public PlainActionFuture execute( }, exception -> { String errorMessage = "Failed to register local model in step " + currentNodeId; logger.error(errorMessage, exception); - registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception))); })); } catch (FlowFrameworkException e) { registerLocalModelFuture.onFailure(e); diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index 9c4fa75b4..c1933b0c4 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -18,6 +18,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.MLTask; @@ -127,7 +128,7 @@ protected void retryableGetMlTask( }, exception -> { String errorMessage = workflowStep + " failed"; logger.error(errorMessage, exception); - mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST)); })); try { Thread.sleep(this.retryDuration.getMillis()); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index 403e26063..7b296cdc6 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to create connector"; logger.error(errorMessage, e); - createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java index 58e5f789a..0dba99f7a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete agent " + agentId; logger.error(errorMessage, e); - deleteAgentFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index ba411c0a2..11a2b2d62 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete connector " + connectorId; logger.error(errorMessage, e); - deleteConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java index 9dab3365e..c8071f7cd 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -85,7 +86,7 @@ public void onResponse(DeleteResponse deleteResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to delete model " + modelId; logger.error(errorMessage, e); - deleteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index 3d29e37ca..929f5f570 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -117,7 +118,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to deploy model " + modelId; logger.error(errorMessage, e); - deployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java index 46b120bde..8042d5244 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java @@ -15,6 +15,7 @@ import org.opensearch.common.Nullable; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -135,7 +136,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to register the agent"; logger.error(errorMessage, e); - registerAgentModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 19818e54f..b8a79325a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -15,6 +15,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -118,7 +119,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse public void onFailure(Exception e) { String errorMessage = "Failed to register model group"; logger.error(errorMessage, e); - registerModelGroupFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }; diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index 51081b467..c32a7f0bd 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -15,6 +15,7 @@ import org.opensearch.action.update.UpdateResponse; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -177,7 +178,7 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi public void onFailure(Exception e) { String errorMessage = "Failed to register remote model"; logger.error(errorMessage, e); - registerRemoteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); diff --git a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java index 034bd3caa..00eec6d29 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java @@ -16,6 +16,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsResponse; @@ -98,7 +99,7 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) { public void onFailure(Exception e) { String errorMessage = "Failed to undeploy model " + modelId; logger.error(errorMessage, e); - undeployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); } }); } catch (FlowFrameworkException e) {