Skip to content

Commit

Permalink
Addressed PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Apr 25, 2024
1 parent 13348ae commit 246a5a1
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @param ex exception
* @return exception if safe
*/
public static Exception getException(Exception ex) {
public static Exception getSafeException(Exception ex) {
if (ex instanceof IllegalArgumentException
|| ex instanceof OpenSearchStatusException
|| ex instanceof OpenSearchParseException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create either a search or ingest pipeline
Expand Down Expand Up @@ -139,7 +139,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed step " + pipelineToBeCreated : e.getMessage());
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Abstract local model registration step
Expand Down Expand Up @@ -216,7 +216,7 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
Exception e = getException(exception);
Exception e = getSafeException(exception);
String errorMessage = (e == null ? "Failed to register local model in step " + currentNodeId : e.getMessage());
logger.error(errorMessage, e);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -123,7 +123,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create connector" : ex.getMessage());
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create an index
Expand Down Expand Up @@ -136,10 +137,11 @@ public PlainActionFuture<WorkflowData> execute(
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));
}
}, e -> {
String errorMessage = "Failed to create the index " + indexName;
}, ex -> {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create the index " + indexName : e.getMessage());
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (Exception e) {
createIndexFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a agent for a remote model
Expand Down Expand Up @@ -84,7 +84,7 @@ public void onResponse(DeleteResponse deleteResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete agent " + agentId : e.getMessage());
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a connector for a remote model
Expand Down Expand Up @@ -84,7 +84,7 @@ public void onResponse(DeleteResponse deleteResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete connector " + connectorId : e.getMessage());
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a model for a remote model
Expand Down Expand Up @@ -85,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to deploy a model
Expand Down Expand Up @@ -117,7 +117,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to deploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -135,7 +135,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register the agent" : e.getMessage());
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a model group
Expand Down Expand Up @@ -120,7 +120,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register model group" : e.getMessage());
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a remote model
Expand Down Expand Up @@ -186,7 +186,7 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register remote model" : e.getMessage());
logger.error(errorMessage, e);
registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getException;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to undeploy model
Expand Down Expand Up @@ -98,7 +98,7 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) {

@Override
public void onFailure(Exception ex) {
Exception e = getException(ex);
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to undeploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -139,4 +141,25 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}

public void testCreateIndexStepUnsafeFailure() throws ExecutionException, InterruptedException, IOException {
@SuppressWarnings({ "unchecked" })
ArgumentCaptor<ActionListener<CreateIndexResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
PlainActionFuture<WorkflowData> future = createIndexStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture());

actionListenerCaptor.getValue().onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test")));

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.core.action.ActionListener;
Expand All @@ -23,6 +24,7 @@
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.RemoteTransportException;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -203,6 +205,27 @@ public void testRegisterRemoteModelFailure() {

}

public void testRegisterRemoteModelUnSafeFailure() {
doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test")));
return null;
}).when(mlNodeClient).register(any(MLRegisterModelInput.class), any());

PlainActionFuture<WorkflowData> future = this.registerRemoteModelStep.execute(
workflowData.getNodeId(),
workflowData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertTrue(future.isDone());
ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass());
assertTrue(ex.getCause() instanceof FlowFrameworkException);
assertEquals("Failed to register remote model", ex.getCause().getMessage());

}

public void testMissingInputs() {
PlainActionFuture<WorkflowData> future = this.registerRemoteModelStep.execute(
"nodeId",
Expand Down

0 comments on commit 246a5a1

Please sign in to comment.