From 0d77699c21fb4c3af25817418e12096239ba2312 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Tue, 14 Nov 2023 23:46:22 +0000 Subject: [PATCH] addressing comments Signed-off-by: Amit Galitzky --- .../flowframework/common/CommonValue.java | 2 +- .../flowframework/model/ResourcesCreated.java | 50 ++++++++--- .../rest/RestGetWorkflowAction.java | 13 +-- .../transport/GetWorkflowRequest.java | 84 +++++++++++++++++++ .../transport/GetWorkflowTransportAction.java | 13 +-- .../ProvisionWorkflowTransportAction.java | 8 +- .../transport/WorkflowRequest.java | 29 +------ .../workflow/CreateConnectorStep.java | 19 +++-- .../model/ResourcesCreatedTests.java | 46 ++++++++++ 9 files changed, 201 insertions(+), 63 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/transport/GetWorkflowRequest.java create mode 100644 src/test/java/org/opensearch/flowframework/model/ResourcesCreatedTests.java diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index 322be72b0..294c6319c 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -135,6 +135,6 @@ private CommonValue() {} /** The field name for the ResourcesCreated's resource ID */ public static final String RESOURCE_ID_FIELD = "resource_id"; /** The field name for the ResourcesCreated's resource name */ - public static final String RESOURCE_NAME_FIELD = "resource_type"; + public static final String WORKFLOW_STEP_NAME = "workflow_step_name"; } diff --git a/src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java b/src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java index 6fe9cfd82..4feb442d5 100644 --- a/src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java +++ b/src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java @@ -19,23 +19,23 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD; -import static org.opensearch.flowframework.common.CommonValue.RESOURCE_NAME_FIELD; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME; /** * This represents an object in the WorkflowState {@link WorkflowState}. */ public class ResourcesCreated implements ToXContentObject, Writeable { - private String resourceName; + private String workflowStepName; private String resourceId; /** * Create this resources created object with given resource name and ID. - * @param resourceName The resource name associating to the step name where it was created + * @param workflowStepName The workflow step name associating to the step where it was created * @param resourceId The resources ID for relating to the created resource */ - public ResourcesCreated(String resourceName, String resourceId) { - this.resourceName = resourceName; + public ResourcesCreated(String workflowStepName, String resourceId) { + this.workflowStepName = workflowStepName; this.resourceId = resourceId; } @@ -45,24 +45,42 @@ public ResourcesCreated(String resourceName, String resourceId) { * @throws IOException if failed to read input stream */ public ResourcesCreated(StreamInput input) throws IOException { - this.resourceName = input.readString(); + this.workflowStepName = input.readString(); this.resourceId = input.readString(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject() - .field(RESOURCE_NAME_FIELD, resourceName) + .field(WORKFLOW_STEP_NAME, workflowStepName) .field(RESOURCE_ID_FIELD, resourceId); return xContentBuilder.endObject(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(resourceName); + out.writeString(workflowStepName); out.writeString(resourceId); } + /** + * Gets the resource id. + * + * @return the resourceId. + */ + public String resourceId() { + return resourceId; + } + + /** + * Gets the workflow step name associated to the created resource + * + * @return the workflowStepName. + */ + public String workflowStepName() { + return workflowStepName; + } + /** * Parse raw JSON content into a resourcesCreated instance. * @@ -71,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException { * @throws IOException if content can't be parsed correctly */ public static ResourcesCreated parse(XContentParser parser) throws IOException { - String resourceName = null; + String workflowStepName = null; String resourceId = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); @@ -80,8 +98,8 @@ public static ResourcesCreated parse(XContentParser parser) throws IOException { parser.nextToken(); switch (fieldName) { - case RESOURCE_NAME_FIELD: - resourceName = parser.text(); + case WORKFLOW_STEP_NAME: + workflowStepName = parser.text(); break; case RESOURCE_ID_FIELD: resourceId = parser.text(); @@ -90,7 +108,15 @@ public static ResourcesCreated parse(XContentParser parser) throws IOException { throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object."); } } - return new ResourcesCreated(resourceName, resourceId); + if (workflowStepName == null || resourceId == null) { + throw new IOException("A resourcesCreated object requires both a workflowStepName and resourceId."); + } + return new ResourcesCreated(workflowStepName, resourceId); + } + + @Override + public String toString() { + return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]"; } } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java index e3ce20822..81602da9e 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.client.node.NodeClient; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; @@ -19,7 +20,7 @@ import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.GetWorkflowAction; -import org.opensearch.flowframework.transport.WorkflowRequest; +import org.opensearch.flowframework.transport.GetWorkflowRequest; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; @@ -67,7 +68,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request // Validate content if (request.hasContent()) { - throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST); + throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST); } // Validate params String workflowId = request.param(WORKFLOW_ID); @@ -75,14 +76,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } - boolean all = request.paramAsBoolean("_all", false); - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, false, all); - return channel -> client.execute(GetWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { + boolean all = request.paramAsBoolean("all", false); + GetWorkflowRequest getWorkflowRequest = new GetWorkflowRequest(workflowId, all); + return channel -> client.execute(GetWorkflowAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); }, exception -> { try { - FlowFrameworkException ex = (FlowFrameworkException) exception; + FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowRequest.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowRequest.java new file mode 100644 index 000000000..c7594eb77 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowRequest.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.Nullable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Transport Request to get a workflow or workflow status + */ +public class GetWorkflowRequest extends ActionRequest { + + /** + * The documentId of the workflow entry within the Global Context index + */ + @Nullable + private String workflowId; + + /** + * The all parameter on the get request + */ + private boolean all; + + /** + * Instantiates a new GetWorkflowRequest + * @param workflowId the documentId of the workflow + * @param all whether the get request is looking for all fields in status + */ + public GetWorkflowRequest(@Nullable String workflowId, boolean all) { + this.workflowId = workflowId; + this.all = all; + } + + /** + * Instantiates a new GetWorkflowRequest request + * @param in The input stream to read from + * @throws IOException If the stream cannot be read properly + */ + public GetWorkflowRequest(StreamInput in) throws IOException { + super(in); + this.workflowId = in.readString(); + this.all = in.readBoolean(); + } + + /** + * Gets the workflow Id of the request + * @return the workflow Id + */ + @Nullable + public String getWorkflowId() { + return this.workflowId; + } + + /** + * Gets the value of the all parameter + * @return whether the all parameter was present or not in request + */ + public boolean getAll() { + return this.all; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(workflowId); + out.writeBoolean(all); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java index 3471540cc..1a59d033b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java @@ -34,9 +34,10 @@ //TODO: Currently we only get the workflow status but we should change to be able to get the // full template as well /** - * Transport Action to get status of a current workflow + * Transport Action to get a specific workflow. Currently, we only support the action with _status + * in the API path but will add the ability to get the workflow and not just the status in the future */ -public class GetWorkflowTransportAction extends HandledTransportAction { +public class GetWorkflowTransportAction extends HandledTransportAction { private final Logger logger = LogManager.getLogger(GetWorkflowTransportAction.class); @@ -57,13 +58,13 @@ public GetWorkflowTransportAction( Client client, NamedXContentRegistry xContentRegistry ) { - super(GetWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new); + super(GetWorkflowAction.NAME, transportService, actionFilters, GetWorkflowRequest::new); this.client = client; this.xContentRegistry = xContentRegistry; } @Override - protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { + protected void doExecute(Task task, GetWorkflowRequest request, ActionListener listener) { String workflowId = request.getWorkflowId(); User user = ParseUtils.getUserContext(client); GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX).id(workflowId); @@ -81,11 +82,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { if (e instanceof IndexNotFoundException) { - listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.BAD_REQUEST)); + listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.NOT_FOUND)); } else { logger.error("Failed to get workflow status of " + workflowId, e); listener.onFailure(e); diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 830758738..3f4c10525 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -34,6 +34,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; @@ -46,6 +47,7 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD; import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; +import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; @@ -125,7 +127,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { logger.info("updated workflow {} state to PROVISIONING", request.getWorkflowId()); @@ -233,7 +237,7 @@ private void executeWorkflow(List workflowSequence, ActionListener< // Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally workflowFutureList.forEach(CompletableFuture::join); - workflowListener.onResponse("READY"); + // workflowListener.onResponse("READY"); } catch (IllegalArgumentException e) { workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST)); diff --git a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java index 81c3d7a96..2d2046329 100644 --- a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java +++ b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java @@ -43,36 +43,19 @@ public class WorkflowRequest extends ActionRequest { * @param template the use case template which describes the workflow */ public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) { - this(workflowId, template, false, false); + this(workflowId, template, false); } - /** - * The all parameter on the get request - */ - private boolean all; - /** * Instantiates a new WorkflowRequest * @param workflowId the documentId of the workflow * @param template the use case template which describes the workflow * @param dryRun flag to indicate if validation is necessary - * @param all whether the get request is looking for all fields in status */ - public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun, boolean all) { + public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun) { this.workflowId = workflowId; this.template = template; this.dryRun = dryRun; - this.all = all; - } - - /** - * Instantiates a new WorkflowRequest - * @param workflowId the documentId of the workflow - * @param template the use case template which describes the workflow - * @param all whether the get request is looking for all fields in status - */ - public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean all) { - this(workflowId, template, all, false); } /** @@ -114,14 +97,6 @@ public boolean isDryRun() { return this.dryRun; } - /** - * Gets the value of the all parameter - * @return whether the all parameter was present or not in request - */ - public boolean getAll() { - return this.all; - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index 5ca0befae..1e067a0bc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -72,6 +72,7 @@ public CreateConnectorStep(MachineLearningNodeClient mlClient, FlowFrameworkIndi this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } + // TODO: need to add retry conflicts here @Override public CompletableFuture execute(List data) throws IOException { CompletableFuture createConnectorFuture = new CompletableFuture<>(); @@ -80,6 +81,9 @@ public CompletableFuture execute(List data) throws I @Override public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { + createConnectorFuture.complete( + new WorkflowData(Map.ofEntries(Map.entry("connector_id", mlCreateConnectorResponse.getConnectorId()))) + ); try { logger.info("Created connector successfully"); String workflowId = data.get(0).getWorkflowId(); @@ -102,19 +106,16 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { script, ActionListener.wrap(updateResponse -> { logger.info("updated resources craeted of {}", workflowId); - }, - exception -> { - logger.error("Failed to update workflow state with newly created resource: {}", exception.getMessage()); - } - ) + }, exception -> { + createConnectorFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + logger.error("Failed to update workflow state with newly created resource: {}", exception); + }) ); } catch (IOException e) { logger.error("Failed to parse new created resource", e); } - - createConnectorFuture.complete( - new WorkflowData(Map.ofEntries(Map.entry("connector_id", mlCreateConnectorResponse.getConnectorId()))) - ); } @Override diff --git a/src/test/java/org/opensearch/flowframework/model/ResourcesCreatedTests.java b/src/test/java/org/opensearch/flowframework/model/ResourcesCreatedTests.java new file mode 100644 index 000000000..864529209 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/model/ResourcesCreatedTests.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class ResourcesCreatedTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testParseFeature() throws IOException { + ResourcesCreated resourcesCreated = new ResourcesCreated("A", "B"); + assertEquals(resourcesCreated.workflowStepName(), "A"); + assertEquals(resourcesCreated.resourceId(), "B"); + + String expectedJson = "{\"workflow_step_name\":\"A\",\"resource_id\":\"B\"}"; + String json = TemplateTestJsonUtil.parseToJson(resourcesCreated); + assertEquals(expectedJson, json); + + ResourcesCreated resourcesCreatedTwo = ResourcesCreated.parse(TemplateTestJsonUtil.jsonToParser(json)); + assertEquals("A", resourcesCreatedTwo.workflowStepName()); + assertEquals("B", resourcesCreatedTwo.resourceId()); + } + + public void testExceptions() throws IOException { + String badJson = "{\"wrong\":\"A\",\"resource_id\":\"B\"}"; + IOException e = assertThrows(IOException.class, () -> ResourcesCreated.parse(TemplateTestJsonUtil.jsonToParser(badJson))); + assertEquals("Unable to parse field [wrong] in a resources_created object.", e.getMessage()); + + String missingJson = "{\"resource_id\":\"B\"}"; + e = assertThrows(IOException.class, () -> ResourcesCreated.parse(TemplateTestJsonUtil.jsonToParser(missingJson))); + assertEquals("A resourcesCreated object requires both a workflowStepName and resourceId.", e.getMessage()); + } + +}