Skip to content

Commit

Permalink
Change last provisioned timestamp when provisioning workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Mar 5, 2024
1 parent ae36573 commit 1fa040e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))

### Enhancements
- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551))

### Bug Fixes
### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}

// Parse template from document source
Template template = Template.parse(response.getSourceAsString());
Template parsedTemplate = Template.parse(response.getSourceAsString());

// Decrypt template
template = encryptorUtils.decryptTemplateCredentials(template);
final Template template = encryptorUtils.decryptTemplateCredentials(parsedTemplate);

// Sort and validate graph
Workflow provisionWorkflow = template.workflows().get(PROVISION_WORKFLOW);
Expand All @@ -134,6 +134,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {
if (TRUE.equals(workflowIsNotStarted)) {
// update state index
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand All @@ -145,7 +146,36 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING);
executeWorkflowAsync(workflowId, provisionProcessSequence, listener);
listener.onResponse(new WorkflowResponse(workflowId));
// update last provisioned field in template
Template newTemplate = new Template.Builder().name(template.name())
.description(template.description())
.useCase(template.useCase())
.templateVersion(template.templateVersion())
.compatibilityVersion(template.compatibilityVersion())
.workflows(template.workflows())
.uiMetadata(template.getUiMetadata())
.user(template.getUser()) // Should we care about old user here?
.createdTime(template.createdTime())
.lastUpdatedTime(template.lastUpdatedTime())
.lastProvisionedTime(Instant.now().toEpochMilli())
.build();
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
newTemplate,
ActionListener.wrap(templateResponse -> {
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);

Check warning on line 169 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L168-L169

Added lines #L168 - L169 were not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 171 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L171

Added line #L171 was not covered by tests
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))

Check warning on line 174 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L173-L174

Added lines #L173 - L174 were not covered by tests
);
}
})

Check warning on line 177 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L177

Added line #L177 was not covered by tests
);
}, exception -> {
String errorMessage = "Failed to update workflow state: " + workflowId;
logger.error(errorMessage, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.Version;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -147,6 +149,12 @@ public void testProvisionWorkflow() {
return null;
}).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any());

doAnswer(invocation -> {
ActionListener<IndexResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true));
return null;
}).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any());

provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<WorkflowResponse> responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
Expand Down

0 comments on commit 1fa040e

Please sign in to comment.