Skip to content

Commit

Permalink
Only update state index on register agent (opensearch-project#253)
Browse files Browse the repository at this point in the history
* fixing status api bug with new deploy task

Signed-off-by: Amit Galitzky <[email protected]>

* changed to getName()

Signed-off-by: Amit Galitzky <[email protected]>

---------

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Dec 5, 2023
1 parent 9387167 commit 6974eaf
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
Instant.now().toEpochMilli()
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
logger.info("updated workflow {} state to {}", workflowId, State.FAILED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage(), ex); })
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;

import java.util.Map;
Expand Down Expand Up @@ -58,6 +59,24 @@ public AbstractRetryableWorkflowStep(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

/**
* Completes the future for either deploy or register local model step
* @param resourceName resource name for the given step
* @param nodeId node ID of the given step
* @param workflowId workflow ID of the given workflow
* @param response Response from ml commons get Task API
* @param future CompletableFuture of the given step
*/
public void completeFuture(String resourceName, String nodeId, String workflowId, MLTask response, CompletableFuture future) {
future.complete(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())),
workflowId,
nodeId
)
);
}

/**
* Retryable get ml task
* @param workflowId the workflow id
Expand Down Expand Up @@ -91,31 +110,25 @@ void retryableGetMlTask(
try {
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
future.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

if (getName().equals(WorkflowResources.DEPLOY_MODEL.getWorkflowStep())) {
completeFuture(resourceName, nodeId, workflowId, response, future);
} else {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
completeFuture(resourceName, nodeId, workflowId, response, future);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
}
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.AGENT_ID;
import static org.opensearch.flowframework.common.CommonValue.APP_TYPE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
Expand Down Expand Up @@ -89,18 +88,9 @@ public CompletableFuture<WorkflowData> execute(
ActionListener<MLRegisterAgentResponse> actionListener = new ActionListener<>() {
@Override
public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
logger.info("Agent registration successful for the agent {}", mlRegisterAgentResponse.getAgentId());
registerAgentModelFuture.complete(
new WorkflowData(
Map.ofEntries(Map.entry(AGENT_ID, mlRegisterAgentResponse.getAgentId())),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);

try {
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
logger.info("Created connector successfully");
logger.info("Agent registration successful for the agent {}", mlRegisterAgentResponse.getAgentId());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
Expand Down

0 comments on commit 6974eaf

Please sign in to comment.