Skip to content

Commit

Permalink
new exception type for workflow step failures
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 14, 2024
1 parent 7198573 commit 7b249f6
Show file tree
Hide file tree
Showing 22 changed files with 165 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte
private static final long serialVersionUID = 1L;

/** The rest status code of this exception */
private final RestStatus restStatus;
protected final RestStatus restStatus;

/**
* Constructor with error message.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of an exception that is caused by a workflow step failing outside of our plugin
* This is caught by an external client (e.g. ml-client) returning the failure
*/
public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject {

private static final long serialVersionUID = 1L;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, RestStatus restStatus) {
super(message, restStatus);
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(Throwable cause, RestStatus restStatus) {
super(cause, restStatus);
}

Check warning on line 42 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L41-L42

Added lines #L41 - L42 were not covered by tests

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause, restStatus);
}

Check warning on line 52 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L51-L52

Added lines #L51 - L52 were not covered by tests

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;

Check warning on line 60 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L60

Added line #L60 was not covered by tests
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();

Check warning on line 65 in src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java#L65

Added line #L65 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
Expand Down Expand Up @@ -123,9 +124,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 129 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L129

Added line #L129 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 131 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L131

Added line #L131 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));

Check warning on line 133 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L133

Added line #L133 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -84,9 +85,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 90 in src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java#L90

Added line #L90 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 92 in src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java#L92

Added line #L92 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));

Check warning on line 94 in src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java#L94

Added line #L94 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -66,8 +67,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
}
// Validate content
if (request.hasContent()) {
// BaseRestHandler will give appropriate error message
return channel -> channel.sendResponse(null);
throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
Expand All @@ -80,9 +80,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 85 in src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java#L85

Added line #L85 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java#L87

Added line #L87 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));

Check warning on line 89 in src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java#L89

Added line #L89 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -85,9 +86,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 91 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java#L91

Added line #L91 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 93 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java#L93

Added line #L93 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));

Check warning on line 95 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java#L95

Added line #L95 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -82,9 +83,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 88 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java#L88

Added line #L88 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 90 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java#L90

Added line #L90 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));

Check warning on line 92 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java#L92

Added line #L92 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -81,9 +82,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L87

Added line #L87 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 89 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L89

Added line #L89 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));

Check warning on line 91 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L91

Added line #L91 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -92,9 +93,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;

Check warning on line 98 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L98

Added line #L98 was not covered by tests
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;

Check warning on line 100 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L100

Added line #L100 was not covered by tests
} else {
ex = new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));

Check warning on line 102 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L102

Added line #L102 was not covered by tests
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
}, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); })
);
} catch (Exception ex) {
RestStatus status;
if (ex instanceof FlowFrameworkException) {
status = ((FlowFrameworkException) ex).getRestStatus();

Check warning on line 261 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L261

Added line #L261 was not covered by tests
} else {
status = ExceptionsHelper.status(ex);

Check warning on line 263 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L263

Added line #L263 was not covered by tests
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
+ currentStepId
+ ", restStatus: "
+ status.toString();

Check warning on line 270 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L270

Added line #L270 was not covered by tests
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

Expand Down Expand Up @@ -135,7 +136,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -214,7 +215,7 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected void retryableGetMlTask(
}, exception -> {
String errorMessage = workflowStep + " failed";
logger.error(errorMessage, exception);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST));

Check warning on line 131 in src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java#L131

Added line #L131 was not covered by tests
}));
try {
Thread.sleep(this.retryDuration.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Loading

0 comments on commit 7b249f6

Please sign in to comment.