Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add allow_delete parameter to Deprovision API (#763) #785

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))
- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,7 @@ public Collection<Object> createComponents(
flowFrameworkSettings,
client
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(
workflowStepFactory,
threadPool,
clusterService,
client,
flowFrameworkSettings
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The param name for provision workflow in create API */
/** Param name for allow deletion during deprovisioning */
public static final String ALLOW_DELETE = "allow_delete";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteIndexStep;
import org.opensearch.flowframework.workflow.DeleteIngestPipelineStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeleteSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
import org.opensearch.flowframework.workflow.RegisterAgentStep;
Expand Down Expand Up @@ -54,11 +57,11 @@ public enum WorkflowResources {
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -57,6 +60,7 @@ public String getName() {
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
String allowDelete = request.param(ALLOW_DELETE);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
Expand All @@ -73,7 +77,11 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
null,
allowDelete == null ? Collections.emptyMap() : Map.of(ALLOW_DELETE, allowDelete)
);

return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -40,8 +42,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD;
Expand Down Expand Up @@ -95,6 +99,7 @@ public DeprovisionWorkflowTransportAction(
@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {
String workflowId = request.getWorkflowId();
String allowDelete = request.getParams().get(ALLOW_DELETE);
GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true);

// Stash thread context to interact with system index
Expand All @@ -103,9 +108,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
client.execute(GetWorkflowStateAction.INSTANCE, getStateRequest, ActionListener.wrap(response -> {
context.restore();

Set<String> deleteAllowedResources = Strings.tokenizeByCommaToSet(allowDelete);
// Retrieve resources from workflow state and deprovision
threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL)
.execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener));
.execute(
() -> executeDeprovisionSequence(
workflowId,
response.getWorkflowState().resourcesCreated(),
deleteAllowedResources,
listener
)
);
}, exception -> {
String errorMessage = "Failed to get workflow state for workflow " + workflowId;
logger.error(errorMessage, exception);
Expand All @@ -121,26 +134,28 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
private void executeDeprovisionSequence(
String workflowId,
List<ResourceCreated> resourcesCreated,
Set<String> deleteAllowedResources,
ActionListener<WorkflowResponse> listener
) {

List<ResourceCreated> deleteNotAllowed = new ArrayList<>();
// Create a list of ProcessNodes with the corresponding deprovision workflow steps
List<ProcessNode> deprovisionProcessSequence = new ArrayList<>();
for (ResourceCreated resource : resourcesCreated) {
String workflowStepId = resource.workflowStepId();

String stepName = resource.workflowStepName();
String deprovisionStep = getDeprovisionStepByWorkflowStep(stepName);
// Unimplemented steps presently return null, so skip
if (deprovisionStep == null) {
WorkflowStep deprovisionStep = workflowStepFactory.createStep(getDeprovisionStepByWorkflowStep(stepName));
// Skip if the step requires allow_delete but the resourceId isn't included
if (deprovisionStep.allowDeleteRequired() && !deleteAllowedResources.contains(resource.resourceId())) {
deleteNotAllowed.add(resource);
continue;
}
// New ID is old ID with (deprovision step type) prepended
String deprovisionStepId = "(deprovision_" + stepName + ") " + workflowStepId;
deprovisionProcessSequence.add(
new ProcessNode(
deprovisionStepId,
workflowStepFactory.createStep(deprovisionStep),
deprovisionStep,
Collections.emptyMap(),
Collections.emptyMap(),
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Expand Down Expand Up @@ -215,17 +230,21 @@ private void executeDeprovisionSequence(
List<ResourceCreated> remainingResources = deprovisionProcessSequence.stream()
.map(pn -> getResourceFromDeprovisionNode(pn, resourcesCreated))
.collect(Collectors.toList());
logger.info("Resources remaining: {}", remainingResources);
updateWorkflowState(workflowId, remainingResources, listener);
logger.info("Resources remaining: {}.", remainingResources);
if (!deleteNotAllowed.isEmpty()) {
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
}

private void updateWorkflowState(
String workflowId,
List<ResourceCreated> remainingResources,
List<ResourceCreated> deleteNotAllowed,
ActionListener<WorkflowResponse> listener
) {
if (remainingResources.isEmpty()) {
// Successful deprovision, reset state to initial
if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) {
// Successful deprovision of all resources, reset state to initial
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
Expand All @@ -244,35 +263,49 @@ private void updateWorkflowState(
listener.onResponse(new WorkflowResponse(workflowId));
}, listener);
} else {
// Failed deprovision
// Remaining resources only includes ones we tried to delete
List<ResourceCreated> stateIndexResources = new ArrayList<>(remainingResources);
// Add in those we skipped
stateIndexResources.addAll(deleteNotAllowed);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Map.entry(STATE_FIELD, State.COMPLETED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.DONE),
Map.entry(PROVISION_END_TIME_FIELD, Instant.now().toEpochMilli()),
Map.entry(RESOURCES_CREATED_FIELD, remainingResources)
Map.entry(RESOURCES_CREATED_FIELD, stateIndexResources)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to COMPLETED", workflowId);
}, exception -> { logger.error("Failed to update workflow {} state", workflowId, exception); })
);
// give user list of remaining resources
StringBuilder message = new StringBuilder();
appendResourceInfo(message, "Failed to deprovision some resources: ", remainingResources);
appendResourceInfo(message, "These resources require the " + ALLOW_DELETE + " parameter to deprovision: ", deleteNotAllowed);
listener.onFailure(
new FlowFrameworkException(
"Failed to deprovision some resources: ["
+ remainingResources.stream()
.map(DeprovisionWorkflowTransportAction::getResourceNameAndId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.joining(", "))
+ "].",
RestStatus.ACCEPTED
)
new FlowFrameworkException(message.toString(), remainingResources.isEmpty() ? RestStatus.FORBIDDEN : RestStatus.ACCEPTED)
);
}
}

private static void appendResourceInfo(StringBuilder message, String prefix, List<ResourceCreated> resources) {
if (!resources.isEmpty()) {
if (message.length() > 0) {
message.append(" ");
}
message.append(prefix)
.append(
resources.stream()
.map(DeprovisionWorkflowTransportAction::getResourceNameAndId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.joining(", ", "[", "]"))
)
.append(".");
}
}

private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List<ResourceCreated> resourcesCreated) {
return resourcesCreated.stream()
.filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete an index
*/
public class DeleteIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(DeleteIndexStep.class);
private final Client client;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "delete_index";
/** Required input keys */
public static final Set<String> REQUIRED_INPUTS = Set.of(INDEX_NAME);
/** Optional input keys */
public static final Set<String> OPTIONAL_INPUTS = Collections.emptySet();
/** Provided output keys */
public static final Set<String> PROVIDED_OUTPUTS = Set.of(INDEX_NAME);

/**
* Instantiate this class
*
* @param client Client to delete an index
*/
public DeleteIndexStep(Client client) {
this.client = client;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {
PlainActionFuture<WorkflowData> deleteIndexFuture = PlainActionFuture.newFuture();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
REQUIRED_INPUTS,
OPTIONAL_INPUTS,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String indexName = (String) inputs.get(INDEX_NAME);

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);

client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
logger.info("Deleted index: {}", indexName);
deleteIndexFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, indexName)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, ex -> {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete the index " + indexName : e.getMessage());
logger.error(errorMessage, e);
deleteIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (Exception e) {
deleteIndexFuture.onFailure(e);
}
return deleteIndexFuture;
}

@Override
public String getName() {
return NAME;

Check warning on line 101 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L101

Added line #L101 was not covered by tests
}

@Override
public boolean allowDeleteRequired() {
return true;

Check warning on line 106 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L106

Added line #L106 was not covered by tests
}
}
Loading
Loading