Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add param to delete workflow API to clear status even if resources exist #719

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Features
### Enhancements
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))
- Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719))

### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The parameter to clear workflow state when deleting template */
public static final String CLEAR_STATUS = "clear_status";
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,17 @@ public <T> void getProvisioningProgress(
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS
* @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
public <T> void canDeleteWorkflowStateDoc(
String documentId,
boolean clearStatus,
Consumer<Boolean> canDeleteStateConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
Expand All @@ -545,7 +551,7 @@ public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> c
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
(clearStatus || workflowState.resourcesCreated().isEmpty())
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class WorkflowState implements ToXContentObject, Writeable {
* @param provisionEndTime Indicates the end time of the whole provisioning flow
* @param user The user extracted from the thread context from the request
* @param userOutputs A map of essential API responses for backend to use and lookup.
* @param resourcesCreated A map of all the resources created.
* @param resourcesCreated A list of all the resources created.
*/
public WorkflowState(
String workflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -62,6 +63,7 @@
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
request.param(CLEAR_STATUS); // consume and ignore, we will pass params to workflow
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
Expand All @@ -78,7 +80,7 @@
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, request.params());

Check warning on line 83 in src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java#L83

Added line #L83 was not covered by tests
return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow status.", ExceptionsHelper.status(exception));

Check warning on line 86 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java#L86

Added line #L86 was not covered by tests
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Booleans;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
Expand Down Expand Up @@ -65,10 +67,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
} else {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));

Check warning on line 86 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java#L86

Added line #L86 was not covered by tests
}
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND));
listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND));

Check warning on line 90 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java#L90

Added line #L90 was not covered by tests
} else {
String errorMessage = "Failed to get workflow status of: " + workflowId;
logger.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -454,10 +455,22 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId) throws Exception {
return deleteWorkflow(client, workflowId, "");
}

/**
* Helper method to invoke the Delete Workflow Rest Action
* @param client the rest client
* @param workflowId the workflow ID to delete
* @param params a string adding any rest path params
* @return a rest response
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(RestClient client, String workflowId, String params) throws Exception {
return TestHelpers.makeRequest(
client,
"DELETE",
String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId),
String.format(Locale.ROOT, "%s/%s%s", WORKFLOW_URI, workflowId, params),
Collections.emptyMap(),
"",
null
Expand All @@ -481,7 +494,6 @@ protected Response getWorkflowStatus(RestClient client, String workflowId, boole
"",
null
);

}

/**
Expand Down Expand Up @@ -586,7 +598,7 @@ protected SearchResponse searchWorkflowState(RestClient client, String query) th
}

/**
* Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status
* Helper method to invoke the Get Workflow Status Rest Action and assert the provisioning and state status
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @param stateStatus the state status name
Expand All @@ -607,6 +619,17 @@ protected void getAndAssertWorkflowStatus(
assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD));
}

/**
* Helper method to invoke the Get Workflow Status Rest Action and assert document is not found
* @param client the rest client
* @param workflowId the workflow ID to get the status
* @throws Exception if the request fails
*/
protected void getAndAssertWorkflowStatusNotFound(RestClient client, String workflowId) throws Exception {
ResponseException ex = assertThrows(ResponseException.class, () -> getWorkflowStatus(client, workflowId, true));
assertEquals(RestStatus.NOT_FOUND.getStatus(), ex.getResponse().getStatusLine().getStatusCode());
}

/**
* Helper method to invoke the Get Workflow status Rest Action and get the error field
* @param client the rest client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("deprecation")
public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase {
@Mock
private Client client;
Expand Down Expand Up @@ -262,19 +263,10 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

Expand Down Expand Up @@ -318,7 +310,7 @@ public void testCanDeleteWorkflowStateDoc() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
Expand Down Expand Up @@ -347,10 +339,10 @@ public void testCanNotDeleteWorkflowStateDocInProgress() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
public void testDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
Expand All @@ -376,12 +368,18 @@ public void testCanNotDeleteWorkflowStateDocResourcesExist() {
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
// Can't delete because resources exist
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener);

// But can delete if clearStatus set true
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
Consumer<Boolean> function = mock(Consumer.class);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is not deleted
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
30,
TimeUnit.SECONDS
);
}

public void testCreateAndProvisionCyclicalTemplate() throws Exception {
Expand Down Expand Up @@ -235,6 +246,13 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(1).resourceId());
assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName());
assertNotNull(resourcesCreated.get(2).resourceId());

// Delete the workflow without deleting the resources
Response deleteResponse = deleteWorkflow(client(), workflowId, "?clear_status=true");
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Expand Down Expand Up @@ -305,6 +323,9 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
// Hit Delete API
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// Verify state doc is deleted
assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS);
}

public void testTimestamps() throws Exception {
Expand Down
Loading