diff --git a/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java index 1a47cea7a..8434dc848 100644 --- a/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java +++ b/src/main/java/org/opensearch/flowframework/exception/WorkflowStepException.java @@ -73,7 +73,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @param ex exception * @return exception if safe */ - public static Exception getException(Exception ex) { + public static Exception getSafeException(Exception ex) { if (ex instanceof IllegalArgumentException || ex instanceof OpenSearchStatusException || ex instanceof OpenSearchParseException diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java index f98ce0f9f..e23d88b63 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java @@ -34,7 +34,7 @@ import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to create either a search or ingest pipeline @@ -139,7 +139,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed step " + pipelineToBeCreated : e.getMessage()); logger.error(errorMessage, e); createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index bad376c76..fe4e54b6a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -49,7 +49,7 @@ import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Abstract local model registration step @@ -216,7 +216,7 @@ public PlainActionFuture execute( }, exception -> { registerLocalModelFuture.onFailure(exception); }) ); }, exception -> { - Exception e = getException(exception); + Exception e = getSafeException(exception); String errorMessage = (e == null ? "Failed to register local model in step " + currentNodeId : e.getMessage()); logger.error(errorMessage, e); registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index e1481852b..484807ce3 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -44,7 +44,7 @@ import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD; import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap; /** @@ -123,7 +123,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to create connector" : ex.getMessage()); logger.error(errorMessage, e); createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index f28fa64d4..32ca9e9f6 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -38,6 +38,7 @@ import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to create an index @@ -136,10 +137,11 @@ public PlainActionFuture execute( logger.error(errorMessage, ex); createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex))); } - }, e -> { - String errorMessage = "Failed to create the index " + indexName; + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to create the index " + indexName : e.getMessage()); logger.error(errorMessage, e); - createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + createIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); })); } catch (Exception e) { createIndexFuture.onFailure(e); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java index 81d943947..b49be90b3 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java @@ -24,7 +24,7 @@ import java.util.Set; import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to delete a agent for a remote model @@ -84,7 +84,7 @@ public void onResponse(DeleteResponse deleteResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to delete agent " + agentId : e.getMessage()); logger.error(errorMessage, e); deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index a9ffcb549..6a1a5e0c7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -24,7 +24,7 @@ import java.util.Set; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to delete a connector for a remote model @@ -84,7 +84,7 @@ public void onResponse(DeleteResponse deleteResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to delete connector " + connectorId : e.getMessage()); logger.error(errorMessage, e); deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java index 4ccb1a48a..66a3e4ec0 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java @@ -24,7 +24,7 @@ import java.util.Set; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to delete a model for a remote model @@ -85,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to delete model " + modelId : e.getMessage()); logger.error(errorMessage, e); deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index 8a48c5d15..56c2a6181 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -29,7 +29,7 @@ import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to deploy a model @@ -117,7 +117,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to deploy model " + modelId : e.getMessage()); logger.error(errorMessage, e); deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java index db66b0419..d485f6f5c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java @@ -48,7 +48,7 @@ import static org.opensearch.flowframework.common.CommonValue.TYPE; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap; /** @@ -135,7 +135,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to register the agent" : e.getMessage()); logger.error(errorMessage, e); registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 5062d7696..1a2fcebe9 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -38,7 +38,7 @@ import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS; import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to register a model group @@ -120,7 +120,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to register model group" : e.getMessage()); logger.error(errorMessage, e); registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index ee203524a..cce5d6ee8 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -38,7 +38,7 @@ import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to register a remote model @@ -186,7 +186,7 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to register remote model" : e.getMessage()); logger.error(errorMessage, e); registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java index a677b9f5c..ab7bc1f16 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java @@ -29,7 +29,7 @@ import static org.opensearch.flowframework.common.CommonValue.SUCCESS; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; -import static org.opensearch.flowframework.exception.WorkflowStepException.getException; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; /** * Step to undeploy model @@ -98,7 +98,7 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) { @Override public void onFailure(Exception ex) { - Exception e = getException(ex); + Exception e = getSafeException(ex); String errorMessage = (e == null ? "Failed to undeploy model " + modelId : e.getMessage()); logger.error(errorMessage, e); undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java index 3b6af8ffa..538747b94 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.workflow; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.support.PlainActionFuture; @@ -24,6 +25,7 @@ import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import java.io.IOException; import java.util.Collections; @@ -139,4 +141,25 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE assertTrue(ex.getCause() instanceof Exception); assertEquals("Failed to create the index demo", ex.getCause().getMessage()); } + + public void testCreateIndexStepUnsafeFailure() throws ExecutionException, InterruptedException, IOException { + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = createIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test"))); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to create the index demo", ex.getCause().getMessage()); + } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java index 2aa1f11c8..1312d1638 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.update.UpdateResponse; import org.opensearch.core.action.ActionListener; @@ -23,6 +24,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.RemoteTransportException; import java.io.IOException; import java.util.Collections; @@ -203,6 +205,27 @@ public void testRegisterRemoteModelFailure() { } + public void testRegisterRemoteModelUnSafeFailure() { + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test"))); + return null; + }).when(mlNodeClient).register(any(MLRegisterModelInput.class), any()); + + PlainActionFuture future = this.registerRemoteModelStep.execute( + workflowData.getNodeId(), + workflowData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertTrue(future.isDone()); + ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to register remote model", ex.getCause().getMessage()); + + } + public void testMissingInputs() { PlainActionFuture future = this.registerRemoteModelStep.execute( "nodeId",