Skip to content

Commit

Permalink
[Backport 2.x] Support editing of certain workflow fields on a provis…
Browse files Browse the repository at this point in the history
…ioned workflow (#775)

Support editing of certain workflow fields on a provisioned workflow (#757)

* Support editing of certain workflow fields on a provisioned workflow



* Add integ test



* Address review comments



* Refactor field update method to Template class



* Update tests to ensure update timestamp increments



---------


(cherry picked from commit 7d45f92)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent ec78e2a commit 7b171f6
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 76 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The field name for provision workflow within a use case template*/
/** The param name for provision workflow in create API */
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down
86 changes: 76 additions & 10 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.util.Strings;
import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME;
Expand All @@ -53,6 +56,14 @@ public class Template implements ToXContentObject {
public static final String TEMPLATE_FIELD = "template";
/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
/** Fields which may be updated in the template even if provisioned */
public static final Set<String> UPDATE_FIELD_ALLOWLIST = Set.of(
NAME_FIELD,
DESCRIPTION_FIELD,
USE_CASE_FIELD,
VERSION_FIELD,
UI_METADATA_FIELD
);

private final String name;
private final String description;
Expand All @@ -77,9 +88,9 @@ public class Template implements ToXContentObject {
* @param workflows Workflow graph definitions corresponding to the defined operations.
* @param uiMetadata The UI metadata related to the given workflow
* @param user The user extracted from the thread context from the request
* @param createdTime Created time in milliseconds since the epoch
* @param lastUpdatedTime Last Updated time in milliseconds since the epoch
* @param lastProvisionedTime Last Provisioned time in milliseconds since the epoch
* @param createdTime Created time as an Instant
* @param lastUpdatedTime Last Updated time as an Instant
* @param lastProvisionedTime Last Provisioned time as an Instant
*/
public Template(
String name,
Expand Down Expand Up @@ -286,9 +297,9 @@ public Template build() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.field(NAME_FIELD, this.name.trim());
xContentBuilder.field(DESCRIPTION_FIELD, this.description == null ? "" : this.description.trim());
xContentBuilder.field(USE_CASE_FIELD, this.useCase == null ? "" : this.useCase.trim());

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
Expand Down Expand Up @@ -334,6 +345,35 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

/**
* Merges two templates by updating the fields from an existing template with the (non-null) fields of another one.
* @param existingTemplate An existing complete template.
* @param templateWithNewFields A template containing only fields to update. The fields must correspond to the field names in {@link #UPDATE_FIELD_ALLOWLIST}.
* @return the updated template.
*/
public static Template updateExistingTemplate(Template existingTemplate, Template templateWithNewFields) {
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithNewFields.name() != null) {
builder.name(templateWithNewFields.name());
}
if (!Strings.isBlank(templateWithNewFields.description())) {
builder.description(templateWithNewFields.description());
}
if (!Strings.isBlank(templateWithNewFields.useCase())) {
builder.useCase(templateWithNewFields.useCase());
}
if (templateWithNewFields.templateVersion() != null) {
builder.templateVersion(templateWithNewFields.templateVersion());
}
if (!templateWithNewFields.compatibilityVersion().isEmpty()) {
builder.compatibilityVersion(templateWithNewFields.compatibilityVersion());
}
if (templateWithNewFields.getUiMetadata() != null) {
builder.uiMetadata(templateWithNewFields.getUiMetadata());
}
return builder.build();
}

/**
* Parse raw xContent into a Template instance.
*
Expand All @@ -342,9 +382,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser) throws IOException {
return parse(parser, false);
}

/**
* Parse raw xContent into a Template instance.
*
* @param parser xContent based content parser
* @param fieldUpdate if set true, will be used for updating an existing template
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser, boolean fieldUpdate) throws IOException {
String name = null;
String description = "";
String useCase = "";
String description = null;
String useCase = null;
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Workflow> workflows = new HashMap<>();
Expand All @@ -357,6 +409,12 @@ public static Template parse(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
if (fieldUpdate && !UPDATE_FIELD_ALLOWLIST.contains(fieldName)) {
throw new FlowFrameworkException(
"You can not update the field [" + fieldName + "] without updating the whole template.",
RestStatus.BAD_REQUEST
);
}
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
Expand Down Expand Up @@ -421,8 +479,16 @@ public static Template parse(XContentParser parser) throws IOException {
);
}
}
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
if (!fieldUpdate) {
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
}
if (description == null) {
description = "";
}
if (useCase == null) {
useCase = "";
}
}

return new Builder().name(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -117,11 +119,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
if (useCase != null) {
// Reconstruct the template from a substitution-ready use case
String useCaseTemplateFileInStringFormat = ParseUtils.resourceToString(
"/" + DefaultUseCases.getSubstitutionReadyFileByUseCaseName(useCase)
);
Expand Down Expand Up @@ -178,21 +192,25 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
null,
useCaseDefaultsMap
);
XContentParser parserTestJson = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parserTestJson.currentToken(), parserTestJson);
template = Template.parse(parserTestJson);

XContentParser useCaseParser = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, useCaseParser.currentToken(), useCaseParser);
template = Template.parse(useCaseParser);
} else {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
template = Template.parse(parser);
template = Template.parse(parser, updateFields);
}

// If not provisioning, params map is empty. Use it to pass updateFields flag to WorkflowRequest
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,43 +233,56 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
);
} else {
// This is an existing workflow (PUT)
final boolean isFieldUpdate = request.isUpdateFields();
// Fetch existing entry for time stamps
logger.info("Querying existing workflow from global context: {}", workflowId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> {
context.restore();
if (getResponse.isExists()) {
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
// Update existing entry, full document replacement
Template template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Template template = isFieldUpdate
? Template.updateExistingTemplate(existingTemplate, templateWithUser)
: new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
template,
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
// Ignore state index if updating fields
if (!isFieldUpdate) {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info(
"updated workflow {} state to {}",
request.getWorkflowId(),
State.NOT_STARTED.name()
);
}
})
);
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow "
+ request.getWorkflowId()
+ " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
}
})
);
} else {
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);
Expand All @@ -278,7 +291,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
})
}),
isFieldUpdate
);
} else {
String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context.";
Expand Down
Loading

0 comments on commit 7b171f6

Please sign in to comment.