Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new exception type for workflow step failures #577

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))
- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551))
- Enable Flow Framework by default ([#553](https://github.com/opensearch-project/flow-framework/pull/553))
- Adding new exception type for workflow step failures ([#577](https://github.com/opensearch-project/flow-framework/pull/577))

### Bug Fixes
### Infrastructure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte
private static final long serialVersionUID = 1L;

/** The rest status code of this exception */
private final RestStatus restStatus;
protected final RestStatus restStatus;

/**
* Constructor with error message.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of an exception that is caused by a workflow step failing outside of our plugin
* This is caught by an external client (e.g. ml-client) returning the failure
*/
public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject {

private static final long serialVersionUID = 1L;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, RestStatus restStatus) {
super(message, restStatus);
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(Throwable cause, RestStatus restStatus) {
super(cause, restStatus);
}

Check warning on line 42 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L41-L42

Added lines #L41 - L42 were not covered by tests

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause, restStatus);
}

Check warning on line 52 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L51-L52

Added lines #L51 - L52 were not covered by tests

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;

Check warning on line 60 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L60

Added line #L60 was not covered by tests
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();

Check warning on line 65 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L65

Added line #L65 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 185 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L185

Added line #L185 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 89 in src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java#L89

Added line #L89 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@
}
// Validate content
if (request.hasContent()) {
// BaseRestHandler will give appropriate error message
return channel -> channel.sendResponse(null);
throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
Expand All @@ -82,7 +81,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 84 in src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java#L84

Added line #L84 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java#L87

Added line #L87 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 86 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L86

Added line #L86 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 97 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L97

Added line #L97 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@
}, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); })
);
} catch (Exception ex) {
RestStatus status;
if (ex instanceof FlowFrameworkException) {
status = ((FlowFrameworkException) ex).getRestStatus();

Check warning on line 261 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L261

Added line #L261 was not covered by tests
} else {
status = ExceptionsHelper.status(ex);

Check warning on line 263 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L263

Added line #L263 was not covered by tests
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
+ currentStepId
+ ", restStatus: "
+ status.toString();

Check warning on line 270 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L270

Added line #L270 was not covered by tests
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

Expand Down Expand Up @@ -140,7 +141,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -214,7 +215,7 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
Expand Down Expand Up @@ -127,7 +128,7 @@
}, exception -> {
String errorMessage = workflowStep + " failed";
logger.error(errorMessage, exception);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST));

Check warning on line 131 in src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java#L131

Added line #L131 was not covered by tests
}));
try {
Thread.sleep(this.retryDuration.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete agent " + agentId;
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete connector " + connectorId;
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -85,7 +86,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete model " + modelId;
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to deploy model " + modelId;
logger.error(errorMessage, e);
deployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to register the agent";
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
public void onFailure(Exception e) {
String errorMessage = "Failed to register model group";
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Loading
Loading