Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/agent_framework] Added Retry functionality for Deploy Model #245

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public enum WorkflowResources {
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
/** official workflow step name for creating an index and associated created resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class WorkflowNode implements ToXContentObject {
/** The field defining the timeout value for this node */
public static final String NODE_TIMEOUT_FIELD = "node_timeout";
/** The default timeout value if the template doesn't override it */
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s";
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "15s";
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,133 @@
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTaskState;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;

/**
* Abstract retryable workflow step
*/
public abstract class AbstractRetryableWorkflowStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(AbstractRetryableWorkflowStep.class);
/** The maximum number of transport request retries */
protected volatile Integer maxRetry;
private final MachineLearningNodeClient mlClient;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new Retryable workflow step
* @param settings Environment settings
* @param clusterService the cluster service
* @param mlClient machine learning client
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public AbstractRetryableWorkflowStep(Settings settings, ClusterService clusterService) {
public AbstractRetryableWorkflowStep(
Settings settings,
ClusterService clusterService,
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

/**
* Retryable get ml task
* @param workflowId the workflow id
* @param nodeId the workflow node id
* @param future the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
* @param workflowStep the workflow step which requires a retry get ml task functionality
*/
void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> future,
String taskId,
int retries,
String workflowStep
) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = workflowStep + " failed with error : " + response.getError();
logger.error(errorMessage);
future.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException(workflowStep + " is not yet completed");
}
} else {
try {
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
future.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}
}, exception -> {
if (retries < maxRetry) {
// Sleep thread prior to retrying request
try {
Thread.sleep(5000);
} catch (Exception e) {
FutureUtils.cancel(future);
}
retryableGetMlTask(workflowId, nodeId, future, taskId, retries + 1, workflowStep);
} else {
logger.error("Failed to retrieve" + workflowStep + ",maximum retries exceeded");
future.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse;
Expand All @@ -27,18 +30,29 @@
/**
* Step to deploy a model
*/
public class DeployModelStep implements WorkflowStep {
public class DeployModelStep extends AbstractRetryableWorkflowStep {
private static final Logger logger = LogManager.getLogger(DeployModelStep.class);

private final MachineLearningNodeClient mlClient;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
static final String NAME = "deploy_model";

/**
* Instantiate this class
* @param settings The OpenSearch settings
* @param clusterService The cluster service
* @param mlClient client to instantiate MLClient
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public DeployModelStep(MachineLearningNodeClient mlClient) {
public DeployModelStep(
Settings settings,
ClusterService clusterService,
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
super(settings, clusterService, mlClient, flowFrameworkIndicesHandler);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
Expand All @@ -55,13 +69,10 @@ public CompletableFuture<WorkflowData> execute(
@Override
public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
logger.info("Model deployment state {}", mlDeployModelResponse.getStatus());
deployModelFuture.complete(
new WorkflowData(
Map.ofEntries(Map.entry("deploy_model_status", mlDeployModelResponse.getStatus())),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
String taskId = mlDeployModelResponse.getTaskId();

// Attempt to retrieve the model ID
retryableGetMlTask(currentNodeInputs.getWorkflowId(), currentNodeId, deployModelFuture, taskId, 0, "Deploy model");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -72,7 +73,7 @@ public CompletableFuture<WorkflowData> execute(
@Override
public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse) {
try {
logger.info("Remote Model registration successful");
logger.info("Model group registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
Expand Down Expand Up @@ -134,7 +135,7 @@ public void onFailure(Exception e) {
if (description != null) {
builder.description(description);
}
if (!backendRoles.isEmpty()) {
if (!CollectionUtils.isEmpty(backendRoles)) {
builder.backendRoles(backendRoles);
}
if (modelAccessMode != null) {
Expand All @@ -160,6 +161,9 @@ public String getName() {

@SuppressWarnings("unchecked")
private List<String> getBackendRoles(Map<String, Object> content) {
return (List<String>) content.get(BACKEND_ROLES_FIELD);
if (content.containsKey(BACKEND_ROLES_FIELD)) {
return (List<String>) content.get(BACKEND_ROLES_FIELD);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.model.MLModelConfig;
import org.opensearch.ml.common.model.MLModelFormat;
import org.opensearch.ml.common.model.TextEmbeddingModelConfig;
Expand All @@ -34,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.ALL_CONFIG;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
Expand All @@ -45,7 +41,6 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;

Expand Down Expand Up @@ -75,7 +70,7 @@ public RegisterLocalModelStep(
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
super(settings, clusterService);
super(settings, clusterService, mlClient, flowFrameworkIndicesHandler);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -98,7 +93,14 @@ public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) {
String taskId = mlRegisterModelResponse.getTaskId();

// Attempt to retrieve the model ID
retryableGetMlTask(currentNodeInputs.getWorkflowId(), currentNodeId, registerLocalModelFuture, taskId, 0);
retryableGetMlTask(
currentNodeInputs.getWorkflowId(),
currentNodeId,
registerLocalModelFuture,
taskId,
0,
"Local model registration"
);
}

@Override
Expand Down Expand Up @@ -178,84 +180,4 @@ public void onFailure(Exception e) {
public String getName() {
return NAME;
}

/**
* Retryable get ml task
* @param workflowId the workflow id
* @param nodeId the workflow node id
* @param registerLocalModelFuture the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
*/
void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> registerLocalModelFuture,
String taskId,
int retries
) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = "Local model registration failed with error : " + response.getError();
logger.error(errorMessage);
registerLocalModelFuture.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException("Local model registration is not yet completed");
}
} else {
try {
logger.info("Local Model registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
registerLocalModelFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}
}, exception -> {
if (retries < maxRetry) {
// Sleep thread prior to retrying request
try {
Thread.sleep(5000);
} catch (Exception e) {
FutureUtils.cancel(registerLocalModelFuture);
}
final int retryAdd = retries + 1;
retryableGetMlTask(workflowId, nodeId, registerLocalModelFuture, taskId, retryAdd);
} else {
logger.error("Failed to retrieve local model registration task, maximum retries exceeded");
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}
}));
}
}
Loading
Loading