Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates exception handling with FlowFrameworkException AND adds dryrun param to Create Workflow #137

Merged
merged 26 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5152ad3
Simplifying Template format, removing operations, resources created, …
joshpalis Oct 25, 2023
6ddd3b2
Initial commit, modifies use case template to seperate workflow input…
joshpalis Oct 27, 2023
3a382fe
merge main
joshpalis Oct 27, 2023
e116c10
Adding tests
joshpalis Oct 27, 2023
becc510
Adding validate graph test
joshpalis Oct 27, 2023
52be80c
Added model-group step to workflow-steps.json, rebasing with main
joshpalis Oct 30, 2023
ee8f3cb
Addressing PR comments, moving sorting/validating prior to executing …
joshpalis Oct 31, 2023
290cd06
Merge branch 'main' into validate
joshpalis Oct 31, 2023
67e993f
Adding javadocs
joshpalis Oct 31, 2023
ddf946f
Moving validation prior to updating workflow state to provisioning
joshpalis Oct 31, 2023
63524fa
Addressing PR comments Part 1
joshpalis Oct 31, 2023
56865d2
Addressing PR comments Part 2 : Moving field names to common value cl…
joshpalis Oct 31, 2023
250c4d4
Merge branch 'main' into validate
joshpalis Oct 31, 2023
2c61f59
Merge branch 'main' into validate
joshpalis Oct 31, 2023
eb428de
Adding definition for noop workflow step
joshpalis Oct 31, 2023
b8b01f5
Merge branch 'main' into validate
joshpalis Oct 31, 2023
8ba9d0b
Addressing PR comments Part 3
joshpalis Oct 31, 2023
8163655
Modifies rest actions to throw flow framework exceptions, transport a…
joshpalis Oct 31, 2023
217b018
merge main
joshpalis Oct 31, 2023
98ab980
Fixing credentials field in workflow-step json
joshpalis Nov 1, 2023
b3336b2
Fixing test
joshpalis Nov 1, 2023
515507a
Using ExceptionsHelper.status() to determine the rest status code bas…
joshpalis Nov 2, 2023
eefe602
Adding dryrun param to create workflow API, allows for validation bef…
joshpalis Nov 2, 2023
3021b3d
concatenating log message with exception message on failure
joshpalis Nov 2, 2023
a72904a
Adding dry run test
joshpalis Nov 2, 2023
6b364c6
Simplifying FlowFrameworkException::toXContent
joshpalis Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private CommonValue() {}
public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for dry run, the flag to indicate if validation is necessary */
public static final String DRY_RUN = "dryrun";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

Expand Down Expand Up @@ -62,21 +63,25 @@
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try {

String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

Check warning on line 69 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L68-L69

Added lines #L68 - L69 were not covered by tests

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);

Check warning on line 71 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L71

Added line #L71 was not covered by tests

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {

Check warning on line 76 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L73-L76

Added lines #L73 - L76 were not covered by tests
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back create workflow exception", e);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}
}));

Check warning on line 84 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L78-L84

Added lines #L78 - L84 were not covered by tests
} catch (Exception e) {
FlowFrameworkException ex = new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST);
return channel -> channel.sendResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.List;

import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
Expand All @@ -39,30 +44,35 @@

private final Logger logger = LogManager.getLogger(CreateWorkflowTransportAction.class);

private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param workflowProcessSorter the workflow process sorter
* @param flowFrameworkIndicesHandler The handler for the global context index
* @param client The client used to make the request to OS
*/
@Inject
public CreateWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {

User user = getUserContext(client);
Template templateWithUser = new Template(
request.getTemplate().name(),
Expand All @@ -73,6 +83,21 @@
request.getTemplate().workflows(),
user
);

if (request.isDryRun()) {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
try {
validateWorkflows(templateWithUser);
} catch (Exception e) {

Check warning on line 90 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L89-L90

Added lines #L89 - L90 were not covered by tests
if (e instanceof FlowFrameworkException) {
logger.error("Workflow validation failed for template : " + templateWithUser.name());
listener.onFailure(e);

Check warning on line 93 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L92-L93

Added lines #L92 - L93 were not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 95 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L95

Added line #L95 was not covered by tests
}
return;
}

Check warning on line 98 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L97-L98

Added lines #L97 - L98 were not covered by tests
}

if (request.getWorkflowId() == null) {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
Expand All @@ -85,16 +110,16 @@
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 113 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L113

Added line #L113 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));

Check warning on line 115 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L115

Added line #L115 was not covered by tests
}
})
);
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 122 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L122

Added line #L122 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
Expand All @@ -116,16 +141,16 @@
}, exception -> {
logger.error("Failed to update workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 144 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L144

Added line #L144 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));

Check warning on line 146 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L146

Added line #L146 was not covered by tests
}
})
);
}, exception -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 153 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L153

Added line #L153 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
Expand All @@ -135,4 +160,11 @@
}
}

private void validateWorkflows(Template template) throws Exception {
for (Workflow workflow : template.workflows().values()) {
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow);
workflowProcessSorter.validateGraph(sortedNodes);
}
}

Check warning on line 168 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L165-L168

Added lines #L165 - L168 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,30 @@ public class WorkflowRequest extends ActionRequest {
*/
@Nullable
private Template template;
/**
* Validation flag
*/
private boolean dryRun;

/**
* Instantiates a new WorkflowRequest
* Instantiates a new WorkflowRequest and defaults dry run to false
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
this(workflowId, template, false);
}

/**
* Instantiates a new WorkflowRequest
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
* @param dryRun flag to indicate if validation is necessary
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun) {
this.workflowId = workflowId;
this.template = template;
this.dryRun = dryRun;
}

/**
Expand All @@ -53,6 +68,7 @@ public WorkflowRequest(StreamInput in) throws IOException {
this.workflowId = in.readOptionalString();
String templateJson = in.readOptionalString();
this.template = templateJson == null ? null : Template.parse(templateJson);
this.dryRun = in.readBoolean();
}

/**
Expand All @@ -73,11 +89,20 @@ public Template getTemplate() {
return this.template;
}

/**
* Gets the dry run validation flag
* @return the dry run boolean
*/
public boolean isDryRun() {
return this.dryRun;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(workflowId);
out.writeOptionalString(template == null ? null : template.toJson());
out.writeBoolean(dryRun);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -56,6 +57,7 @@ public void setUp() throws Exception {
this.createWorkflowTransportAction = new CreateWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
mock(WorkflowProcessSorter.class),
flowFrameworkIndicesHandler,
client
);
Expand Down
Loading