Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support editing of certain workflow fields on a provisioned workflow #775

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The field name for provision workflow within a use case template*/
/** The param name for provision workflow in create API */
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down
86 changes: 76 additions & 10 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.util.Strings;
import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME;
Expand All @@ -53,6 +56,14 @@
public static final String TEMPLATE_FIELD = "template";
/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
/** Fields which may be updated in the template even if provisioned */
public static final Set<String> UPDATE_FIELD_ALLOWLIST = Set.of(
NAME_FIELD,
DESCRIPTION_FIELD,
USE_CASE_FIELD,
VERSION_FIELD,
UI_METADATA_FIELD
);

private final String name;
private final String description;
Expand All @@ -77,9 +88,9 @@
* @param workflows Workflow graph definitions corresponding to the defined operations.
* @param uiMetadata The UI metadata related to the given workflow
* @param user The user extracted from the thread context from the request
* @param createdTime Created time in milliseconds since the epoch
* @param lastUpdatedTime Last Updated time in milliseconds since the epoch
* @param lastProvisionedTime Last Provisioned time in milliseconds since the epoch
* @param createdTime Created time as an Instant
* @param lastUpdatedTime Last Updated time as an Instant
* @param lastProvisionedTime Last Provisioned time as an Instant
*/
public Template(
String name,
Expand Down Expand Up @@ -286,9 +297,9 @@
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.field(NAME_FIELD, this.name.trim());
xContentBuilder.field(DESCRIPTION_FIELD, this.description == null ? "" : this.description.trim());
xContentBuilder.field(USE_CASE_FIELD, this.useCase == null ? "" : this.useCase.trim());

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
Expand Down Expand Up @@ -334,6 +345,35 @@
return xContentBuilder.endObject();
}

/**
* Merges two templates by updating the fields from an existing template with the (non-null) fields of another one.
* @param existingTemplate An existing complete template.
* @param templateWithNewFields A template containing only fields to update. The fields must correspond to the field names in {@link #UPDATE_FIELD_ALLOWLIST}.
* @return the updated template.
*/
public static Template updateExistingTemplate(Template existingTemplate, Template templateWithNewFields) {
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithNewFields.name() != null) {
builder.name(templateWithNewFields.name());
}
if (!Strings.isBlank(templateWithNewFields.description())) {
builder.description(templateWithNewFields.description());
}
if (!Strings.isBlank(templateWithNewFields.useCase())) {
builder.useCase(templateWithNewFields.useCase());
}
if (templateWithNewFields.templateVersion() != null) {
builder.templateVersion(templateWithNewFields.templateVersion());
}
if (!templateWithNewFields.compatibilityVersion().isEmpty()) {
builder.compatibilityVersion(templateWithNewFields.compatibilityVersion());
}
if (templateWithNewFields.getUiMetadata() != null) {
builder.uiMetadata(templateWithNewFields.getUiMetadata());
}
return builder.build();
}

/**
* Parse raw xContent into a Template instance.
*
Expand All @@ -342,9 +382,21 @@
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser) throws IOException {
return parse(parser, false);
}

/**
* Parse raw xContent into a Template instance.
*
* @param parser xContent based content parser
* @param fieldUpdate if set true, will be used for updating an existing template
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser, boolean fieldUpdate) throws IOException {
String name = null;
String description = "";
String useCase = "";
String description = null;
String useCase = null;
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Workflow> workflows = new HashMap<>();
Expand All @@ -357,6 +409,12 @@
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
if (fieldUpdate && !UPDATE_FIELD_ALLOWLIST.contains(fieldName)) {
throw new FlowFrameworkException(
"You can not update the field [" + fieldName + "] without updating the whole template.",
RestStatus.BAD_REQUEST
);
}
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
Expand Down Expand Up @@ -421,8 +479,16 @@
);
}
}
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
if (!fieldUpdate) {
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);

Check warning on line 484 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L484

Added line #L484 was not covered by tests
}
if (description == null) {
description = "";

Check warning on line 487 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L487

Added line #L487 was not covered by tests
}
if (useCase == null) {
useCase = "";

Check warning on line 490 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L490

Added line #L490 was not covered by tests
}
}

return new Builder().name(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -117,11 +119,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
if (useCase != null) {
// Reconstruct the template from a substitution-ready use case
String useCaseTemplateFileInStringFormat = ParseUtils.resourceToString(
"/" + DefaultUseCases.getSubstitutionReadyFileByUseCaseName(useCase)
);
Expand Down Expand Up @@ -178,21 +192,25 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
null,
useCaseDefaultsMap
);
XContentParser parserTestJson = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parserTestJson.currentToken(), parserTestJson);
template = Template.parse(parserTestJson);

XContentParser useCaseParser = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, useCaseParser.currentToken(), useCaseParser);
template = Template.parse(useCaseParser);
} else {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
template = Template.parse(parser);
template = Template.parse(parser, updateFields);
}

// If not provisioning, params map is empty. Use it to pass updateFields flag to WorkflowRequest
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,43 +233,56 @@
);
} else {
// This is an existing workflow (PUT)
final boolean isFieldUpdate = request.isUpdateFields();
// Fetch existing entry for time stamps
logger.info("Querying existing workflow from global context: {}", workflowId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> {
context.restore();
if (getResponse.isExists()) {
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
// Update existing entry, full document replacement
Template template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Template template = isFieldUpdate
? Template.updateExistingTemplate(existingTemplate, templateWithUser)
: new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
template,
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
// Ignore state index if updating fields
if (!isFieldUpdate) {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info(
"updated workflow {} state to {}",
request.getWorkflowId(),
State.NOT_STARTED.name()
);
}
})
);
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow "
+ request.getWorkflowId()

Check warning on line 271 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L270-L271

Added lines #L270 - L271 were not covered by tests
+ " in template index";
logger.error(errorMessage, exception);

Check warning on line 273 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L273

Added line #L273 was not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 275 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L275

Added line #L275 was not covered by tests
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))

Check warning on line 278 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L277-L278

Added lines #L277 - L278 were not covered by tests
);
}
})

Check warning on line 281 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L281

Added line #L281 was not covered by tests
);
} else {
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);
Expand All @@ -278,7 +291,8 @@
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
})
}),
isFieldUpdate
);
} else {
String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context.";
Expand Down
Loading
Loading