Skip to content

Commit

Permalink
Throw the correct error message in status API for WorkflowSteps
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Apr 24, 2024
1 parent b8e4ed0 commit 2124797
Show file tree
Hide file tree
Showing 18 changed files with 69 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Bug Fixes
- Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635))
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Throw the correct error message in status API for WorkflowSteps ([#675](https://github.com/opensearch-project/flow-framework/pull/675))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.exception;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -64,4 +67,19 @@ public RestStatus getRestStatus() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}

/**
* Getter for safe exceptions
* @param ex exception
* @return exception if safe
*/
public static Exception getException(Exception ex) {
if (ex instanceof IllegalArgumentException
|| ex instanceof OpenSearchStatusException
|| ex instanceof OpenSearchParseException
|| (ex instanceof OpenSearchException && ex.getCause() instanceof OpenSearchParseException)) {
return ex;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
status = ExceptionsHelper.status(ex);
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
String errorMessage = (ex.getCause() == null ? ex.getMessage() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId
+ ", restStatus: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to create either a search or ingest pipeline
Expand Down Expand Up @@ -137,8 +138,9 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed step " + pipelineToBeCreated : e.getMessage());
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Abstract local model registration step
Expand Down Expand Up @@ -215,9 +216,10 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
Exception e = getException(exception);
String errorMessage = (e == null ? "Failed to register local model in step " + currentNodeId : e.getMessage());
logger.error(errorMessage, e);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (IllegalArgumentException iae) {
registerLocalModelFuture.onFailure(new WorkflowStepException(iae.getMessage(), RestStatus.BAD_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -121,8 +122,9 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to create connector" : ex.getMessage());
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to delete a agent for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete agent " + agentId;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to delete agent " + agentId : e.getMessage());
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to delete a connector for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete connector " + connectorId;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to delete connector " + connectorId : e.getMessage());
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to delete a model for a remote model
Expand Down Expand Up @@ -83,8 +84,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete model " + modelId;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to delete model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to deploy a model
Expand Down Expand Up @@ -115,8 +116,9 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to deploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to deploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -133,8 +134,9 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register the agent";
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to register the agent" : e.getMessage());
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to register a model group
Expand Down Expand Up @@ -118,8 +119,9 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register model group";
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to register model group" : e.getMessage());
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to register a remote model
Expand Down Expand Up @@ -184,8 +185,9 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register remote model";
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to register remote model" : e.getMessage());
logger.error(errorMessage, e);
registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;

/**
* Step to undeploy model
Expand Down Expand Up @@ -96,8 +97,9 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to undeploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getException(ex);
String errorMessage = (e == null ? "Failed to undeploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testRegisterLocalCustomModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testRegisterLocalPretrainedModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testRegisterLocalSparseEncodingModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testRegisterAndDeployRemoteModelSuccess() throws Exception {
public void testRegisterRemoteModelFailure() {
doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register remote model"));
return null;
}).when(mlNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down

0 comments on commit 2124797

Please sign in to comment.