Skip to content

Commit

Permalink
Handled failure for throttling
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Nov 8, 2023
1 parent 722f09d commit aa31ec7
Showing 1 changed file with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -111,21 +110,14 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}

if (request.getWorkflowId() == null) {

// Throttle incoming requests
QueryBuilder query = QueryBuilders.matchAllQuery();
TimeValue requestTimeOut = request.getRequestTimeout();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut);

SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder);

client.search(
searchRequest,
ActionListener.wrap(
response -> onSearchGlobalContext(response, listener, request.getMaxWorkflows()),
exception -> listener.onFailure(exception)
)
);
if (onSearchGlobalContext(request, request.getMaxWorkflows())) {
String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;
}

// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
Expand Down Expand Up @@ -190,17 +182,24 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

/**
* Checks if the max workflows limit has been reachesd
*
* @param response response of the GC index SearchRequest
* @param listener ActionListener of the SearchRequest
* @param request WorkflowRequest
* @param maxWorkflow max workflows
* @return
*/
protected void onSearchGlobalContext(SearchResponse response, ActionListener listener, Integer maxWorkflow) {
if (response.getHits().getTotalHits().value >= maxWorkflow) {
String errorMessage = "Maximum workflows limit reached " + maxWorkflow;
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
protected boolean onSearchGlobalContext(WorkflowRequest request, Integer maxWorkflow) {
final boolean[] workflow = { false };
QueryBuilder query = QueryBuilders.matchAllQuery();
TimeValue requestTimeOut = request.getRequestTimeout();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut);

SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder);

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) {
workflow[0] = true;
}
}, exception -> { throw new FlowFrameworkException("Unable to fetch the workflows", RestStatus.BAD_REQUEST); }));
return workflow[0];
}

private void validateWorkflows(Template template) throws Exception {
Expand Down

0 comments on commit aa31ec7

Please sign in to comment.