Skip to content

Commit

Permalink
Add param to delete workflow API to clear status even if resources ex…
Browse files Browse the repository at this point in the history
…ist (#719)

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed May 31, 2024
1 parent 4ee2171 commit 4ea348a
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Features
### Enhancements
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))
- Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719))

### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The parameter to clear workflow state when deleting template */
public static final String CLEAR_STATUS = "clear_status";
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,17 @@ public <T> void getProvisioningProgress(
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS
* @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
public <T> void canDeleteWorkflowStateDoc(
String documentId,
boolean clearStatus,
Consumer<Boolean> canDeleteStateConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
Expand All @@ -545,7 +551,7 @@ public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> c
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
(clearStatus || workflowState.resourcesCreated().isEmpty())
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class WorkflowState implements ToXContentObject, Writeable {
* @param provisionEndTime Indicates the end time of the whole provisioning flow
* @param user The user extracted from the thread context from the request
* @param userOutputs A map of essential API responses for backend to use and lookup.
* @param resourcesCreated A map of all the resources created.
* @param resourcesCreated A list of all the resources created.
*/
public WorkflowState(
String workflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -62,6 +63,7 @@ public List<Route> routes() {
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
request.param(CLEAR_STATUS); // consume and ignore, we will pass params to workflow
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
Expand All @@ -78,7 +80,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, request.params());
return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow status.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Booleans;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
Expand Down Expand Up @@ -65,10 +67,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
} else {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));
}
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));
} else {
String errorMessage = "Failed to get workflow status of: " + workflowId;
logger.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -454,10 +455,22 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId) throws Exception {
return deleteWorkflow(client, workflowId, "");
}

/**
* Helper method to invoke the Delete Workflow Rest Action
* @param client the rest client
* @param workflowId the workflow ID to delete
* @param params a string adding any rest path params
* @return a rest response
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId, String params) throws Exception {
return TestHelpers.makeRequest(
client,
"DELETE",
String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId),
String.format(Locale.ROOT, "%s/%s%s", WORKFLOW_URI, workflowId, params),
Collections.emptyMap(),
"",
null
Expand All @@ -481,7 +494,6 @@ protected Response getWorkflowStatus(RestClient client, String workflowId, boole
"",
null
);

}

/**
Expand Down Expand Up @@ -586,7 +598,7 @@ protected SearchResponse searchWorkflowState(RestClient client, String query) th
}

/**
* Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status
* Helper method to invoke the Get Workflow Status Rest Action and assert the provisioning and state status
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @param stateStatus the state status name
Expand All @@ -607,6 +619,17 @@ protected void getAndAssertWorkflowStatus(
assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD));
}

/**
* Helper method to invoke the Get Workflow Status Rest Action and assert document is not found
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @throws Exception if the request fails
*/
protected void getAndAssertWorkflowStatusNotFound(RestClient client, String workflowId) throws Exception {
ResponseException ex = assertThrows(ResponseException.class, () -> getWorkflowStatus(client, workflowId, true));
assertEquals(RestStatus.NOT_FOUND.getStatus(), ex.getResponse().getStatusLine().getStatusCode());
}

/**
* Helper method to invoke the Get Workflow status Rest Action and get the error field
* @param client the rest client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("deprecation")
public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase {
@Mock
private Client client;
Expand Down Expand Up @@ -262,19 +263,10 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

Expand Down Expand Up @@ -318,7 +310,7 @@ public void testCanDeleteWorkflowStateDoc() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
Expand Down Expand Up @@ -347,10 +339,10 @@ public void testCanNotDeleteWorkflowStateDocInProgress() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
public void testDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
Expand All @@ -376,12 +368,18 @@ public void testCanNotDeleteWorkflowStateDocResourcesExist() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
// Can't delete because resources exist
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener);

// But can delete if clearStatus set true
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Boolean> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is not deleted
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
30,
TimeUnit.SECONDS
);
}

public void testCreateAndProvisionCyclicalTemplate() throws Exception {
Expand Down Expand Up @@ -235,6 +246,13 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(1).resourceId());
assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName());
assertNotNull(resourcesCreated.get(2).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId, "?clear_status=true");
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Expand Down Expand Up @@ -305,6 +323,9 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
// Hit Delete API
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testTimestamps() throws Exception {
Expand Down

0 comments on commit 4ea348a

Please sign in to comment.