Skip to content

Commit

Permalink
Addressed PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Dec 4, 2023
1 parent 116f78e commit 4b498bd
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,31 @@ public AbstractRetryableWorkflowStep(
* @param future the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
* @param workflowStep the workflow step which requires a retry get ml task functionality
*/
void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> future,
String taskId,
int retries,
String message
String workflowStep
) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = message + " failed with error : " + response.getError();
String errorMessage = workflowStep + " failed with error : " + response.getError();
logger.error(errorMessage);
future.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException(message + " is not yet completed");
throw new IllegalStateException(workflowStep + " is not yet completed");
}
} else {
try {
logger.info(message + " successful for {} and modelId {}", workflowId, response.getModelId());
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
Expand Down Expand Up @@ -128,10 +129,9 @@ void retryableGetMlTask(
} catch (Exception e) {
FutureUtils.cancel(future);
}
final int retryAdd = retries + 1;
retryableGetMlTask(workflowId, nodeId, future, taskId, retryAdd, message);
retryableGetMlTask(workflowId, nodeId, future, taskId, retries + 1, workflowStep);
} else {
logger.error("Failed to retrieve" + message + ",maximum retries exceeded");
logger.error("Failed to retrieve" + workflowStep + ",maximum retries exceeded");
future.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
}));
Expand Down

0 comments on commit 4b498bd

Please sign in to comment.