diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index be100fbce..2e2de0f78 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; @@ -111,21 +110,14 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener onSearchGlobalContext(response, listener, request.getMaxWorkflows()), - exception -> listener.onFailure(exception) - ) - ); + if (onSearchGlobalContext(request, request.getMaxWorkflows())) { + String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); + logger.error(errorMessage); + FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); + listener.onFailure(ffe); + return; + } // Create new global context and state index entries flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { @@ -190,17 +182,24 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener= maxWorkflow) { - String errorMessage = "Maximum workflows limit reached " + maxWorkflow; - logger.error(errorMessage); - listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); - } + protected boolean onSearchGlobalContext(WorkflowRequest request, Integer maxWorkflow) { + final boolean[] workflow = { false }; + QueryBuilder query = QueryBuilders.matchAllQuery(); + TimeValue requestTimeOut = request.getRequestTimeout(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut); + + SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder); + + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) { + workflow[0] = true; + } + }, exception -> { throw new FlowFrameworkException("Unable to fetch the workflows", RestStatus.BAD_REQUEST); })); + return workflow[0]; } private void validateWorkflows(Template template) throws Exception {