diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index da9643cb5..cd4a54a57 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -234,7 +234,7 @@ private void executeWorkflow(List workflowSequence, String workflow Instant.now().toEpochMilli() ), ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED); + logger.info("updated workflow {} state to {}", workflowId, State.FAILED); }, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage(), ex); }) ); } diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index 48b7a0042..f807c752a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -20,6 +20,7 @@ import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; +import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import java.util.Map; @@ -58,6 +59,24 @@ public AbstractRetryableWorkflowStep( this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } + /** + * Completes the future for either deploy or register local model step + * @param resourceName resource name for the given step + * @param nodeId node ID of the given step + * @param workflowId workflow ID of the given workflow + * @param response Response from ml commons get Task API + * @param future CompletableFuture of the given step + */ + public void completeFuture(String resourceName, String nodeId, String workflowId, MLTask response, CompletableFuture future) { + future.complete( + new WorkflowData( + Map.ofEntries(Map.entry(resourceName, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())), + workflowId, + nodeId + ) + ); + } + /** * Retryable get ml task * @param workflowId the workflow id @@ -91,31 +110,25 @@ void retryableGetMlTask( try { logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId()); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - flowFrameworkIndicesHandler.updateResourceInStateIndex( - workflowId, - nodeId, - getName(), - response.getTaskId(), - ActionListener.wrap(updateResponse -> { - logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); - future.complete( - new WorkflowData( - Map.ofEntries( - Map.entry(resourceName, response.getModelId()), - Map.entry(REGISTER_MODEL_STATUS, response.getState().name()) - ), - workflowId, - nodeId - ) - ); - }, exception -> { - logger.error("Failed to update new created resource", exception); - future.completeExceptionally( - new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) - ); - }) - ); - + if (getName().equals(WorkflowResources.DEPLOY_MODEL.getWorkflowStep())) { + completeFuture(resourceName, nodeId, workflowId, response, future); + } else { + flowFrameworkIndicesHandler.updateResourceInStateIndex( + workflowId, + nodeId, + getName(), + response.getTaskId(), + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + completeFuture(resourceName, nodeId, workflowId, response, future); + }, exception -> { + logger.error("Failed to update new created resource", exception); + future.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) + ); + } } catch (Exception e) { logger.error("Failed to parse and update new created resource", e); future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java index 06c97f8d4..0e3a1c7c6 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java @@ -35,7 +35,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import static org.opensearch.flowframework.common.CommonValue.AGENT_ID; import static org.opensearch.flowframework.common.CommonValue.APP_TYPE_FIELD; import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME; import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD; @@ -89,18 +88,9 @@ public CompletableFuture execute( ActionListener actionListener = new ActionListener<>() { @Override public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) { - logger.info("Agent registration successful for the agent {}", mlRegisterAgentResponse.getAgentId()); - registerAgentModelFuture.complete( - new WorkflowData( - Map.ofEntries(Map.entry(AGENT_ID, mlRegisterAgentResponse.getAgentId())), - currentNodeInputs.getWorkflowId(), - currentNodeInputs.getNodeId() - ) - ); - try { String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - logger.info("Created connector successfully"); + logger.info("Agent registration successful for the agent {}", mlRegisterAgentResponse.getAgentId()); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId,