Skip to content

Commit

Permalink
adding check to init GC index if absent before max workflow check
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Nov 20, 2023
1 parent 2dcfc9c commit eb430be
Showing 1 changed file with 80 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.util.List;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
Expand Down Expand Up @@ -109,83 +110,100 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}
}

if (request.getWorkflowId() == null) {
// Throttle incoming requests
checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
flowFrameworkIndicesHandler.initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {
listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR));
return;
}
if (request.getWorkflowId() == null) {
// Throttle incoming requests
checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> {
if (!max) {
String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows();
logger.error(errorMessage);
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(
templateWithUser,
ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));
}
})
);
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}

})
);
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}));
} else {
// Update existing entry, full document replacement
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
request.getTemplate(),
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
logger.error("Failed to update workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}
})
);
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}

}));
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}));
} else {
// Update existing entry, full document replacement
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
request.getTemplate(),
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
logger.error("Failed to update workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
})
);
}, exception -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
})
);
}
}, e -> {
logger.error("Failed to create global_context index", e);
listener.onFailure(new FlowFrameworkException("Failed to create global_context index", INTERNAL_SERVER_ERROR));
}));

})
);
}
}

/**
Expand Down

0 comments on commit eb430be

Please sign in to comment.