Skip to content

Commit

Permalink
Add coverage and changelog
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jul 20, 2024
1 parent c78d1d5 commit 68a73ed
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
- Refactor workflow step resource updates to eliminate duplication ([#796](https://github.com/opensearch-project/flow-framework/pull/796))
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.CreateConnectorStep;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.get.GetResult;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -488,4 +491,52 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
exceptionCaptor.getValue().getMessage()
);
}

public void testAddResourceToStateIndex() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<WorkflowData> listener = mock(ActionListener.class);
// test success
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.addResourceToStateIndex(
new WorkflowData(Collections.emptyMap(), null, null),
"node_id",
CreateConnectorStep.NAME,
"this_id",
listener
);

ArgumentCaptor<WorkflowData> responseCaptor = ArgumentCaptor.forClass(WorkflowData.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID));

// test failure
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to update state"));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.addResourceToStateIndex(
new WorkflowData(Collections.emptyMap(), null, null),
"node_id",
CreateConnectorStep.NAME,
"this_id",
listener
);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to update new created node_id resource create_connector id this_id", exceptionCaptor.getValue().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -224,6 +225,99 @@ public void testRegisterRemoteModelFailure() {

}

public void testRegisterRemoteModelUpdateFailure() {
String taskId = "abcd";
String modelId = "efgh";
String status = MLTaskState.CREATED.name();

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
MLRegisterModelResponse output = new MLRegisterModelResponse(taskId, status, modelId);
actionListener.onResponse(output);
return null;
}).when(mlNodeClient).register(any(MLRegisterModelInput.class), any());

doAnswer(invocation -> {
ActionListener<WorkflowData> updateResponseListener = invocation.getArgument(4);
updateResponseListener.onFailure(new RuntimeException("Failed to update register resource"));
return null;
}).when(flowFrameworkIndicesHandler).addResourceToStateIndex(any(WorkflowData.class), anyString(), anyString(), anyString(), any());

WorkflowData deployWorkflowData = new WorkflowData(
Map.ofEntries(
Map.entry("name", "xyz"),
Map.entry("description", "description"),
Map.entry(CONNECTOR_ID, "abcdefg"),
Map.entry(DEPLOY_FIELD, true)
),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = this.registerRemoteModelStep.execute(
deployWorkflowData.getNodeId(),
deployWorkflowData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass());
assertTrue(ex.getCause() instanceof FlowFrameworkException);
assertEquals("Failed to update new created test-node-id resource register_remote_model id efgh", ex.getCause().getMessage());
}

public void testRegisterRemoteModelDeployUpdateFailure() {
String taskId = "abcd";
String modelId = "efgh";
String status = MLTaskState.CREATED.name();

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
MLRegisterModelResponse output = new MLRegisterModelResponse(taskId, status, modelId);
actionListener.onResponse(output);
return null;
}).when(mlNodeClient).register(any(MLRegisterModelInput.class), any());

AtomicInteger invocationCount = new AtomicInteger(0);
doAnswer(invocation -> {
ActionListener<WorkflowData> updateResponseListener = invocation.getArgument(4);
if (invocationCount.getAndIncrement() == 0) {
// succeed on first call (update register)
updateResponseListener.onResponse(new WorkflowData(Map.of(MODEL_ID, modelId), "test-id", "test-node-id"));
} else {
// fail on second call (update deploy)
updateResponseListener.onFailure(new RuntimeException("Failed to update deploy resource"));
}
return null;
}).when(flowFrameworkIndicesHandler).addResourceToStateIndex(any(WorkflowData.class), anyString(), anyString(), anyString(), any());

WorkflowData deployWorkflowData = new WorkflowData(
Map.ofEntries(
Map.entry("name", "xyz"),
Map.entry("description", "description"),
Map.entry(CONNECTOR_ID, "abcdefg"),
Map.entry(DEPLOY_FIELD, true)
),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = this.registerRemoteModelStep.execute(
deployWorkflowData.getNodeId(),
deployWorkflowData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass());
assertTrue(ex.getCause() instanceof FlowFrameworkException);
assertEquals("Failed to update simulated deploy step resource efgh", ex.getCause().getMessage());
}

public void testReisterRemoteModelInterfaceFailure() {
doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
Expand Down

0 comments on commit 68a73ed

Please sign in to comment.