Skip to content

Commit

Permalink
[Backport 2.x] Improve Template and WorkflowState builders (#793)
Browse files Browse the repository at this point in the history
Improve Template and WorkflowState builders (#778)

* Simplify Template builder instantiation



* Add ability to more easily update WorkflowState



* Changelog, initialize template parser



---------


(cherry picked from commit f40890c)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 81b199f commit 7506139
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Documentation
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public void initializeConfigIndex(ActionListener<Boolean> listener) {
* @param listener action listener
*/
public void putInitialStateToWorkflowState(String workflowId, User user, ActionListener<IndexResponse> listener) {
WorkflowState state = new WorkflowState.Builder().workflowId(workflowId)
WorkflowState state = WorkflowState.builder()
.workflowId(workflowId)
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
Expand Down Expand Up @@ -137,13 +136,13 @@ public static class Builder {
/**
* Empty Constructor for the Builder object
*/
public Builder() {}
private Builder() {}

/**
* Construct a Builder from an existing template
* @param t The existing template to copy
*/
public Builder(Template t) {
private Builder(Template t) {
this.name = t.name();
this.description = t.description();
this.useCase = t.useCase();
Expand Down Expand Up @@ -294,6 +293,23 @@ public Template build() {
}
}

/**
* Instantiate a new Template builder
* @return a new builder instance
*/
public static Builder builder() {
return new Builder();
}

/**
* Instantiate a new Template builder initialized from an existing template
* @param t The existing template to use as the source
* @return a new builder instance initialized from the existing template
*/
public static Builder builder(Template t) {
return new Builder(t);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down Expand Up @@ -352,7 +368,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @return the updated template.
*/
public static Template updateExistingTemplate(Template existingTemplate, Template templateWithNewFields) {
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
Builder builder = Template.builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithNewFields.name() != null) {
builder.name(templateWithNewFields.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public static Builder builder() {
return new Builder();
}

/**
* Constructs a builder object for workflowState from an existing state
* @param existingState a WorkflowState object to initialize the builder with
* @return Builder Object initialized with existing state
*/
public static Builder builder(WorkflowState existingState) {
return new Builder(existingState);
}

/**
* Class for constructing a Builder for WorkflowState
*/
Expand All @@ -143,7 +152,23 @@ public static class Builder {
/**
* Empty Constructor for the Builder object
*/
public Builder() {}
private Builder() {}

/**
* Builder from existing state
* @param existingState a WorkflowState object to initialize the builder with
*/
private Builder(WorkflowState existingState) {
this.workflowId = existingState.getWorkflowId();
this.error = existingState.getError();
this.state = existingState.getState();
this.provisioningProgress = existingState.getProvisioningProgress();
this.provisionStartTime = existingState.getProvisionStartTime();
this.provisionEndTime = existingState.getProvisionEndTime();
this.user = existingState.getUser();
this.userOutputs = existingState.userOutputs();
this.resourcesCreated = existingState.resourcesCreated();
}

/**
* Builder method for adding workflowID
Expand Down Expand Up @@ -254,6 +279,44 @@ public WorkflowState build() {
}
}

/**
* Merges two workflow states by updating the fields from an existing state with the (non-null) fields of another one.
* @param existingState An existing Workflow state.
* @param stateWithNewFields A workflow state containing only fields to update.
* @return the updated workflow state.
*/
public static WorkflowState updateExistingWorkflowState(WorkflowState existingState, WorkflowState stateWithNewFields) {
Builder builder = WorkflowState.builder(existingState);
if (stateWithNewFields.getWorkflowId() != null) {
builder.workflowId(stateWithNewFields.getWorkflowId());
}
if (stateWithNewFields.getError() != null) {
builder.error(stateWithNewFields.getError());
}
if (stateWithNewFields.getState() != null) {
builder.state(stateWithNewFields.getState());
}
if (stateWithNewFields.getProvisioningProgress() != null) {
builder.provisioningProgress(stateWithNewFields.getProvisioningProgress());
}
if (stateWithNewFields.getProvisionStartTime() != null) {
builder.provisionStartTime(stateWithNewFields.getProvisionStartTime());
}
if (stateWithNewFields.getProvisionEndTime() != null) {
builder.provisionEndTime(stateWithNewFields.getProvisionEndTime());
}
if (stateWithNewFields.getUser() != null) {
builder.user(stateWithNewFields.getUser());
}
if (stateWithNewFields.userOutputs() != null) {
builder.userOutputs(stateWithNewFields.userOutputs());
}
if (stateWithNewFields.resourcesCreated() != null) {
builder.resourcesCreated(stateWithNewFields.resourcesCreated());
}
return builder.build();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down Expand Up @@ -492,7 +555,7 @@ public Map<String, Object> userOutputs() {
}

/**
* A map of all the resources created
* A list of all the resources created
* @return the resources created
*/
public List<ResourceCreated> resourcesCreated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
Template template = isFieldUpdate
? Template.updateExistingTemplate(existingTemplate, templateWithUser)
: new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
: Template.builder(templateWithUser)
.createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus)
if (allStatus) {
this.workflowState = workflowState;
} else {
this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId())
this.workflowState = WorkflowState.builder()
.workflowId(workflowState.getWorkflowId())
.error(workflowState.getError())
.state(workflowState.getState())
.resourcesCreated(workflowState.resourcesCreated())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING);
executeWorkflowAsync(workflowId, provisionProcessSequence, listener);
// update last provisioned field in template
Template newTemplate = new Template.Builder(template).lastProvisionedTime(Instant.now()).build();
Template newTemplate = Template.builder(template).lastProvisionedTime(Instant.now()).build();
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
newTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private Template processTemplateCredentials(Template template, Function<String,
processedWorkflows.put(entry.getKey(), new Workflow(entry.getValue().userParams(), processedNodes, entry.getValue().edges()));
}

return new Template.Builder(template).workflows(processedWorkflows).build();
return Template.builder(template).workflows(processedWorkflows).build();
}

/**
Expand Down Expand Up @@ -240,10 +240,10 @@ public Template redactTemplateSecuredFields(User user, Template template) {
}

if (ParseUtils.isAdmin(user)) {
return new Template.Builder(template).workflows(processedWorkflows).build();
return Template.builder(template).workflows(processedWorkflows).build();
}

return new Template.Builder(template).user(null).workflows(processedWorkflows).build();
return Template.builder(template).user(null).workflows(processedWorkflows).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testUpdateExistingTemplate() {
now,
null
);
Template updated = new Template.Builder().name("name two").description("description two").useCase("use case two").build();
Template updated = Template.builder().name("name two").description("description two").useCase("use case two").build();
Template merged = Template.updateExistingTemplate(original, updated);
assertEquals("name two", merged.name());
assertEquals("description two", merged.description());
Expand All @@ -128,7 +128,8 @@ public void testUpdateExistingTemplate() {
assertEquals("1.1.1", merged.compatibilityVersion().get(1).toString());
assertEquals("one", merged.getUiMetadata().get("uiMetadata"));

updated = new Template.Builder().templateVersion(Version.fromString("2.2.2"))
updated = Template.builder()
.templateVersion(Version.fromString("2.2.2"))
.compatibilityVersion(List.of(Version.fromString("2.2.2"), Version.fromString("2.2.2")))
.uiMetadata(Map.of("uiMetadata", "two"))
.build();
Expand Down Expand Up @@ -164,4 +165,23 @@ public void testStrings() throws IOException {
assertTrue(t.toYaml().contains("a test template"));
assertTrue(t.toString().contains("a test template"));
}

public void testNullToEmptyString() throws IOException {
Template t = Template.parse("{\"name\":\"test\"}");
assertEquals("test", t.name());
assertEquals("", t.description());
assertEquals("", t.useCase());

XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.IGNORE_DEPRECATIONS,
"{\"name\":\"test\"}"
);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
t = Template.parse(parser, true);
String json = t.toJson();
assertTrue(json.contains("\"name\":\"test\""));
assertTrue(json.contains("\"description\":\"\""));
assertTrue(json.contains("\"use_case\":\"\""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,77 @@ public void testWorkflowState() throws IOException {
}
}

public void testWorkflowStateUpdate() {
// Time travel to guarantee update increments
Instant now = Instant.now().minusMillis(100);

WorkflowState wfs = WorkflowState.builder()
.workflowId("1")
.error("error one")
.state("state one")
.provisioningProgress("progress one")
.provisionStartTime(now)
.provisionEndTime(now)
.user(new User("one", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))
.userOutputs(Map.of("output", "one"))
.resourcesCreated(List.of(new ResourceCreated("", "", "", "id one")))
.build();

assertEquals("1", wfs.getWorkflowId());
assertEquals("error one", wfs.getError());
assertEquals("state one", wfs.getState());
assertEquals("progress one", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("one", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("one", wfs.userOutputs().get("output"));
assertEquals(1, wfs.resourcesCreated().size());
ResourceCreated rc = wfs.resourcesCreated().get(0);
assertEquals("id one", rc.resourceId());

WorkflowState update = WorkflowState.builder()
.workflowId("2")
.error("error two")
.state("state two")
.provisioningProgress("progress two")
.user(new User("two", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))
.build();

wfs = WorkflowState.updateExistingWorkflowState(wfs, update);
assertEquals("2", wfs.getWorkflowId());
assertEquals("error two", wfs.getError());
assertEquals("state two", wfs.getState());
assertEquals("progress two", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("two", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("one", wfs.userOutputs().get("output"));
assertEquals(1, wfs.resourcesCreated().size());
rc = wfs.resourcesCreated().get(0);
assertEquals("id one", rc.resourceId());

now = Instant.now().minusMillis(100);
update = WorkflowState.builder()
.provisionStartTime(now)
.provisionEndTime(now)
.userOutputs(Map.of("output", "two"))
.resourcesCreated(List.of(wfs.resourcesCreated().get(0), new ResourceCreated("", "", "", "id two")))
.build();

wfs = WorkflowState.updateExistingWorkflowState(wfs, update);
assertEquals("2", wfs.getWorkflowId());
assertEquals("error two", wfs.getError());
assertEquals("state two", wfs.getState());
assertEquals("progress two", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("two", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("two", wfs.userOutputs().get("output"));
assertEquals(2, wfs.resourcesCreated().size());
rc = wfs.resourcesCreated().get(1);
assertEquals("id two", rc.resourceId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
)
.collect(Collectors.toList());
Workflow missingInputs = new Workflow(originalWorkflow.userParams(), modifiednodes, originalWorkflow.edges());
Template templateWithMissingInputs = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build();
Template templateWithMissingInputs = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build();

// Hit Create Workflow API with invalid template
Response response = createWorkflow(client(), templateWithMissingInputs);
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testCreateAndProvisionCyclicalTemplate() throws Exception {
List.of(new WorkflowEdge("workflow_step_2", "workflow_step_3"), new WorkflowEdge("workflow_step_3", "workflow_step_2"))
);

Template cyclicalTemplate = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build();
Template cyclicalTemplate = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build();

// Hit dry run
ResponseException exception = expectThrows(ResponseException.class, () -> createWorkflowValidation(client(), cyclicalTemplate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testUpdateWorkflow() {
ActionListener<GetResponse> getListener = invocation.getArgument(1);
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(getResponse.getSourceAsString()).thenReturn(new Template.Builder().name("test").build().toJson());
when(getResponse.getSourceAsString()).thenReturn(Template.builder().name("test").build().toJson());
getListener.onResponse(getResponse);
return null;
}).when(client).get(any(GetRequest.class), any());
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testUpdateWorkflowWithField() {
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
WorkflowRequest updateWorkflow = new WorkflowRequest(
"1",
new Template.Builder().name("new name").description("test").useCase(null).uiMetadata(Map.of("foo", "bar")).build(),
Template.builder().name("new name").description("test").useCase(null).uiMetadata(Map.of("foo", "bar")).build(),
Map.of(UPDATE_WORKFLOW_FIELDS, "true")
);

Expand Down Expand Up @@ -463,7 +463,8 @@ public void testUpdateWorkflowWithField() {

updateWorkflow = new WorkflowRequest(
"1",
new Template.Builder().useCase("foo")
Template.builder()
.useCase("foo")
.templateVersion(Version.CURRENT)
.compatibilityVersion(List.of(Version.V_2_0_0, Version.CURRENT))
.build(),
Expand Down

0 comments on commit 7506139

Please sign in to comment.