Skip to content

Commit

Permalink
Add param to delete workflow API to clear status even if resources ex…
Browse files Browse the repository at this point in the history
…ist (opensearch-project#719)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: martinpkr <[email protected]>
  • Loading branch information
dbwiddis authored and martinpkr committed Jun 2, 2024
1 parent f8eacbc commit 37ce229
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Features
### Enhancements
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))
- Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719))

### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The parameter to clear workflow state when deleting template */
public static final String CLEAR_STATUS = "clear_status";
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,17 @@ public <T> void getProvisioningProgress(
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS
* @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
public <T> void canDeleteWorkflowStateDoc(
String documentId,
boolean clearStatus,
Consumer<Boolean> canDeleteStateConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
Expand All @@ -545,7 +551,7 @@ public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> c
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
(clearStatus || workflowState.resourcesCreated().isEmpty())
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class WorkflowState implements ToXContentObject, Writeable {
* @param provisionEndTime Indicates the end time of the whole provisioning flow
* @param user The user extracted from the thread context from the request
* @param userOutputs A map of essential API responses for backend to use and lookup.
* @param resourcesCreated A map of all the resources created.
* @param resourcesCreated A list of all the resources created.
*/
public WorkflowState(
String workflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -62,6 +63,7 @@ public List<Route> routes() {
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
request.param(CLEAR_STATUS); // consume and ignore, we will pass params to workflow
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
Expand All @@ -78,7 +80,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, request.params());
return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow status.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Booleans;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
Expand Down Expand Up @@ -65,10 +67,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
} else {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));
}
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));
} else {
String errorMessage = "Failed to get workflow status of: " + workflowId;
logger.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -454,10 +455,22 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId) throws Exception {
return deleteWorkflow(client, workflowId, "");
}

/**
* Helper method to invoke the Delete Workflow Rest Action
* @param client the rest client
* @param workflowId the workflow ID to delete
* @param params a string adding any rest path params
* @return a rest response
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId, String params) throws Exception {
return TestHelpers.makeRequest(
client,
"DELETE",
String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId),
String.format(Locale.ROOT, "%s/%s%s", WORKFLOW_URI, workflowId, params),
Collections.emptyMap(),
"",
null
Expand All @@ -481,7 +494,6 @@ protected Response getWorkflowStatus(RestClient client, String workflowId, boole
"",
null
);

}

/**
Expand Down Expand Up @@ -586,7 +598,7 @@ protected SearchResponse searchWorkflowState(RestClient client, String query) th
}

/**
* Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status
* Helper method to invoke the Get Workflow Status Rest Action and assert the provisioning and state status
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @param stateStatus the state status name
Expand All @@ -607,6 +619,17 @@ protected void getAndAssertWorkflowStatus(
assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD));
}

/**
* Helper method to invoke the Get Workflow Status Rest Action and assert document is not found
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @throws Exception if the request fails
*/
protected void getAndAssertWorkflowStatusNotFound(RestClient client, String workflowId) throws Exception {
ResponseException ex = assertThrows(ResponseException.class, () -> getWorkflowStatus(client, workflowId, true));
assertEquals(RestStatus.NOT_FOUND.getStatus(), ex.getResponse().getStatusLine().getStatusCode());
}

/**
* Helper method to invoke the Get Workflow status Rest Action and get the error field
* @param client the rest client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("deprecation")
public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase {
@Mock
private Client client;
Expand Down Expand Up @@ -262,19 +263,10 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

Expand Down Expand Up @@ -318,7 +310,7 @@ public void testCanDeleteWorkflowStateDoc() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
Expand Down Expand Up @@ -347,10 +339,10 @@ public void testCanNotDeleteWorkflowStateDocInProgress() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
public void testDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
Expand All @@ -376,12 +368,18 @@ public void testCanNotDeleteWorkflowStateDocResourcesExist() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
// Can't delete because resources exist
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener);

// But can delete if clearStatus set true
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Boolean> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is not deleted
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
30,
TimeUnit.SECONDS
);
}

public void testCreateAndProvisionCyclicalTemplate() throws Exception {
Expand Down Expand Up @@ -235,6 +246,13 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(1).resourceId());
assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName());
assertNotNull(resourcesCreated.get(2).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId, "?clear_status=true");
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Expand Down Expand Up @@ -305,6 +323,9 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
// Hit Delete API
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testTimestamps() throws Exception {
Expand Down

0 comments on commit 37ce229

Please sign in to comment.