From eb430be0c0d3994d622fcc63ec1de3d07a5b2d2e Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 20 Nov 2023 19:31:20 +0000 Subject: [PATCH] adding check to init GC index if absent before max workflow check Signed-off-by: Amit Galitzky --- .../CreateWorkflowTransportAction.java | 142 ++++++++++-------- 1 file changed, 80 insertions(+), 62 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index d4f6b3b3f..1b8220873 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -39,6 +39,7 @@ import java.util.List; +import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; @@ -109,83 +110,100 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - if (!max) { - String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); - logger.error(errorMessage); - FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); - listener.onFailure(ffe); - return; - } else { - // Create new global context and state index entries - flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { - flowFrameworkIndicesHandler.putInitialStateToWorkflowState( - globalContextResponse.getId(), - user, - ActionListener.wrap(stateResponse -> { - logger.info("create state workflow doc"); - listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); + flowFrameworkIndicesHandler.initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> { + if (!indexCreated) { + listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR)); + return; + } + if (request.getWorkflowId() == null) { + // Throttle incoming requests + checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> { + if (!max) { + String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); + logger.error(errorMessage); + FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); + listener.onFailure(ffe); + return; + } else { + // Create new global context and state index entries + flowFrameworkIndicesHandler.putTemplateToGlobalContext( + templateWithUser, + ActionListener.wrap(globalContextResponse -> { + flowFrameworkIndicesHandler.putInitialStateToWorkflowState( + globalContextResponse.getId(), + user, + ActionListener.wrap(stateResponse -> { + logger.info("create state workflow doc"); + listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); + }, exception -> { + logger.error("Failed to save workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); + } + }) + ); }, exception -> { - logger.error("Failed to save workflow state : {}", exception.getMessage()); + logger.error("Failed to save use case template : {}", exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); + listener.onFailure( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + } + + }) + ); + } + }, e -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); + if (e instanceof FlowFrameworkException) { + listener.onFailure(e); + } else { + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + })); + } else { + // Update existing entry, full document replacement + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + request.getTemplate(), + ActionListener.wrap(response -> { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), + ActionListener.wrap(updateResponse -> { + logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + logger.error("Failed to update workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); } }) ); }, exception -> { - logger.error("Failed to save use case template : {}", exception.getMessage()); + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); } - })); - } - }, e -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); - if (e instanceof FlowFrameworkException) { - listener.onFailure(e); - } else { - listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); - } - })); - } else { - // Update existing entry, full document replacement - flowFrameworkIndicesHandler.updateTemplateInGlobalContext( - request.getWorkflowId(), - request.getTemplate(), - ActionListener.wrap(response -> { - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), - ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); - }, exception -> { - logger.error("Failed to update workflow state : {}", exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - } - }) - ); - }, exception -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - } + }) + ); + } + }, e -> { + logger.error("Failed to create global_context index", e); + listener.onFailure(new FlowFrameworkException("Failed to create global_context index", INTERNAL_SERVER_ERROR)); + })); - }) - ); - } } /**