Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Delete workflow state when template is deleted and no resources exist #694

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680))
- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676))
- Delete workflow state when template is deleted and no resources exist ([#689](https://github.com/opensearch-project/flow-framework/pull/689))

### Infrastructure
- Switch macos runner to macos-13 from macos-latest since macos-latest is now arm64 ([#686](https://github.com/opensearch-project/flow-framework/pull/686))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
Expand Down Expand Up @@ -519,6 +521,51 @@
}
}

/**
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
// no need to delete if it's not there to start with
canDeleteStateConsumer.accept(Boolean.FALSE);
return;

Check warning on line 540 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L539-L540

Added lines #L539 - L540 were not covered by tests
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
);
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));

Check warning on line 556 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L553-L556

Added lines #L553 - L556 were not covered by tests
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
canDeleteStateConsumer.accept(Boolean.FALSE);
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 565 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L559-L565

Added lines #L559 - L565 were not covered by tests
}
}

/**
* Updates a document in the workflow state index
* @param documentId the document ID
Expand All @@ -531,7 +578,7 @@
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index";
String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
Expand All @@ -551,6 +598,29 @@
}
}

/**
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 619 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L616-L619

Added lines #L616 - L619 were not covered by tests
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });

Check warning on line 70 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L69-L70

Added lines #L69 - L70 were not covered by tests
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);

Check warning on line 73 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L73

Added line #L73 was not covered by tests
}
}, stateListener);

Check warning on line 75 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L75

Added line #L75 was not covered by tests
} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,23 @@
) {
if (remainingResources.isEmpty()) {
// Successful deprovision, reset state to initial
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })

Check warning on line 236 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L235-L236

Added lines #L235 - L236 were not covered by tests
);
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
}, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); }));

Check warning on line 241 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L239-L241

Added lines #L239 - L241 were not covered by tests
}
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
}, listener);
} else {
// Failed deprovision
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
package org.opensearch.flowframework.indices;

import org.opensearch.Version;
import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
Expand All @@ -29,9 +34,11 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand Down Expand Up @@ -285,6 +292,93 @@ public void testIsWorkflowProvisionedFailedParsing() {
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
}

public void testCanDeleteWorkflowStateDoc() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"NOT_STARTED",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"DONE",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
List.of(new ResourceCreated("w", "x", "y", "z"))
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Expand All @@ -302,4 +396,98 @@ public void testDoesTemplateExist() {
flowFrameworkIndicesHandler.doesTemplateExist(documentId, function, listener);
verify(function).accept(true);
}

public void testUpdateFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<UpdateResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, Result.UPDATED));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<UpdateResponse> responseCaptor = ArgumentCaptor.forClass(UpdateResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.UPDATED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to update state"));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to update state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to update document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}

public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.DELETED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to delete state"));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to delete document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}
}
Loading
Loading