Skip to content

Commit

Permalink
Improve error messages for workflow states other than NOT_STARTED
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Apr 3, 2024
1 parent 33ea800 commit deb7b65
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Infrastructure
### Documentation
- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642))

### Maintenance
### Refactoring
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -419,8 +420,8 @@ public void updateTemplateInGlobalContext(
}
doesTemplateExist(documentId, templateExists -> {
if (templateExists) {
isWorkflowNotStarted(documentId, workflowIsNotStarted -> {
if (workflowIsNotStarted || ignoreNotStartedCheck) {
getProvisioningProgress(documentId, progress -> {

Check warning on line 423 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L423

Added line #L423 was not covered by tests
if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
Expand All @@ -436,7 +437,9 @@ public void updateTemplateInGlobalContext(
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
} else {
String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId;
String errorMessage = "The template can not be updated unless its provisioning state is NOT_STARTED: "

Check warning on line 440 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L440

Added line #L440 was not covered by tests
+ documentId
+ ". Deprovision the workflow to reset the state.";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down Expand Up @@ -474,36 +477,40 @@ public <T> void doesTemplateExist(String documentId, Consumer<Boolean> booleanRe
}

/**
* Check if the workflow has been provisioned and executes the consumer by passing a boolean
* Check workflow provisioning state and executes the consumer
*
* @param documentId document id
* @param booleanResultConsumer boolean consumer function based on if workflow is provisioned or not
* @param provisioningProgressConsumer consumer function based on if workflow is provisioned.
* @param listener action listener
* @param <T> action listener response type
*/
public <T> void isWorkflowNotStarted(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
public <T> void getProvisioningProgress(
String documentId,
Consumer<Optional<ProvisioningProgress>> provisioningProgressConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());

Check warning on line 497 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L497

Added line #L497 was not covered by tests
return;
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
booleanResultConsumer.accept(workflowState.getProvisioningProgress().equals(ProvisioningProgress.NOT_STARTED.name()));
provisioningProgressConsumer.accept(Optional.of(ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())));

Check warning on line 505 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L505

Added line #L505 was not covered by tests
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());

Check warning on line 513 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L513

Added line #L513 was not covered by tests
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.lang.Boolean.TRUE;
import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
Expand Down Expand Up @@ -132,8 +131,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {
if (TRUE.equals(workflowIsNotStarted)) {
flowFrameworkIndicesHandler.getProvisioningProgress(workflowId, progress -> {
if (ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
// update state index
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Expand Down Expand Up @@ -174,7 +173,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
})
);
} else {
String errorMessage = "The template has already been provisioned: " + workflowId;
String errorMessage = "The workflow provisioning state is "
+ (progress.isPresent() ? progress.get().toString() : "unknown")
+ " and can not be provisioned unless its state is NOT_STARTED: "
+ workflowId
+ ". Deprovision the workflow to reset the state.";
logger.info(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand All @@ -46,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -253,7 +255,7 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
Expand All @@ -277,7 +279,7 @@ public void testIsWorkflowProvisionedFailedParsing() {
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());
flowFrameworkIndicesHandler.isWorkflowNotStarted(documentId, function, listener);
flowFrameworkIndicesHandler.getProvisioningProgress(documentId, function, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public void testFailedUpdateWorkflow() throws Exception {
ResponseException.class,
() -> updateWorkflow(client(), workflowId, template)
);
assertTrue(exceptionProvisioned.getMessage().contains("The template has already been provisioned so it can't be updated"));
assertTrue(
exceptionProvisioned.getMessage().contains("The template can not be updated unless its provisioning state is NOT_STARTED")
);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand All @@ -40,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -138,10 +140,10 @@ public void testProvisionWorkflow() {

// Bypass isWorkflowNotStarted and force true response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(true);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.NOT_STARTED));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand Down Expand Up @@ -185,10 +187,10 @@ public void testProvisionWorkflowTwice() {

// Bypass isWorkflowNotStarted and force false response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(false);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.DONE));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand All @@ -200,7 +202,10 @@ public void testProvisionWorkflowTwice() {
provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("The template has already been provisioned: 2", exceptionCaptor.getValue().getMessage());
assertEquals(
"The workflow provisioning state is DONE and can not be provisioned unless its state is NOT_STARTED: 2. Deprovision the workflow to reset the state.",
exceptionCaptor.getValue().getMessage()
);
}

public void testFailedToRetrieveTemplateFromGlobalContext() {
Expand Down

0 comments on commit deb7b65

Please sign in to comment.