Skip to content

Commit

Permalink
Throw the correct error message in status API for WorkflowSteps (#676)
Browse files Browse the repository at this point in the history
* Hides user and credential field from search response (#680)

* Hide user and credential field from search response

Signed-off-by: Owais Kazi <[email protected]>

* Hid the user field for Get API as well

Signed-off-by: Owais Kazi <[email protected]>

* Addressed PR Comments

Signed-off-by: Owais Kazi <[email protected]>

* Added user permission and new tests

Signed-off-by: Owais Kazi <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>

* Addressed PR Comments

Signed-off-by: Owais Kazi <[email protected]>

* Fixed failing test

Signed-off-by: Owais Kazi <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Apr 26, 2024
1 parent 757eaaf commit 56e4a78
Show file tree
Hide file tree
Showing 21 changed files with 120 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635))
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680))
- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.exception;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -64,4 +67,19 @@ public RestStatus getRestStatus() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}

/**
* Getter for safe exceptions
* @param ex exception
* @return exception if safe
*/
public static Exception getSafeException(Exception ex) {
if (ex instanceof IllegalArgumentException
|| ex instanceof OpenSearchStatusException
|| ex instanceof OpenSearchParseException
|| (ex instanceof OpenSearchException && ex.getCause() instanceof OpenSearchParseException)) {
return ex;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
status = ExceptionsHelper.status(ex);
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
String errorMessage = (ex.getCause() == null ? ex.getMessage() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId
+ ", restStatus: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create either a search or ingest pipeline
Expand Down Expand Up @@ -137,8 +138,9 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed step " + pipelineToBeCreated : e.getMessage());
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Abstract local model registration step
Expand Down Expand Up @@ -215,9 +216,10 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
Exception e = getSafeException(exception);
String errorMessage = (e == null ? "Failed to register local model in step " + currentNodeId : e.getMessage());
logger.error(errorMessage, e);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (IllegalArgumentException iae) {
registerLocalModelFuture.onFailure(new WorkflowStepException(iae.getMessage(), RestStatus.BAD_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -121,8 +122,9 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create connector" : ex.getMessage());
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create an index
Expand Down Expand Up @@ -136,10 +137,11 @@ public PlainActionFuture<WorkflowData> execute(
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));
}
}, e -> {
String errorMessage = "Failed to create the index " + indexName;
}, ex -> {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create the index " + indexName : e.getMessage());
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (Exception e) {
createIndexFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a agent for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete agent " + agentId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete agent " + agentId : e.getMessage());
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a connector for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete connector " + connectorId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete connector " + connectorId : e.getMessage());
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a model for a remote model
Expand Down Expand Up @@ -83,8 +84,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to deploy a model
Expand Down Expand Up @@ -115,8 +116,9 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to deploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to deploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -133,8 +134,9 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register the agent";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register the agent" : e.getMessage());
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a model group
Expand Down Expand Up @@ -118,8 +119,9 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register model group";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register model group" : e.getMessage());
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a remote model
Expand Down Expand Up @@ -184,8 +185,9 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register remote model";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register remote model" : e.getMessage());
logger.error(errorMessage, e);
registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to undeploy model
Expand Down Expand Up @@ -96,8 +97,9 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to undeploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to undeploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ public void testDefaultSemanticSearchUseCaseWithFailureExpected() throws Excepti

assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
getAndAssertWorkflowStatus(client(), workflowId, State.FAILED, ProvisioningProgress.FAILED);
String error = getAndWorkflowStatusError(client(), workflowId);
assertTrue(error.contains("org.opensearch.flowframework.exception.WorkflowStepException during step create_ingest_pipeline"));
}

public void testAllDefaultUseCasesCreation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -139,4 +141,25 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}

public void testCreateIndexStepUnsafeFailure() throws ExecutionException, InterruptedException, IOException {
@SuppressWarnings({ "unchecked" })
ArgumentCaptor<ActionListener<CreateIndexResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
PlainActionFuture<WorkflowData> future = createIndexStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture());

actionListenerCaptor.getValue().onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test")));

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testRegisterLocalCustomModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testRegisterLocalPretrainedModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testRegisterLocalSparseEncodingModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Loading

0 comments on commit 56e4a78

Please sign in to comment.