From 7eb416aac91dec64f624e78604c1d028824f909e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 4 Jan 2024 19:45:36 +0000 Subject: [PATCH] Iterate collection for unregistered Writeable in streams (#359) * Iterate collection for unregistered Writeable in streams Signed-off-by: Daniel Widdis * Add dependencies so User class compiles Signed-off-by: Daniel Widdis * Make ResourceCreated writable for Script class Signed-off-by: Daniel Widdis --------- Signed-off-by: Daniel Widdis (cherry picked from commit 8216cb93d6378d458502b8e29b627311bb64d06a) Signed-off-by: github-actions[bot] --- build.gradle | 2 + .../indices/FlowFrameworkIndicesHandler.java | 4 +- .../flowframework/model/ResourceCreated.java | 114 +++++------------- .../flowframework/model/WorkflowState.java | 19 ++- .../model/ResourceCreatedTests.java | 28 +---- .../model/WorkflowStateTests.java | 93 ++++++++++++++ 6 files changed, 147 insertions(+), 113 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java diff --git a/build.gradle b/build.gradle index ea87b5a56..9ed6e87ed 100644 --- a/build.gradle +++ b/build.gradle @@ -151,10 +151,12 @@ dependencies { implementation 'org.junit.jupiter:junit-jupiter:5.10.1' implementation "com.google.guava:guava:33.0.0-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" + api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}" implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0' implementation "org.opensearch:common-utils:${common_utils_version}" implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1' implementation 'org.bouncycastle:bcprov-jdk18on:1.77' + api "org.apache.httpcomponents.core5:httpcore5:5.2.2" // ZipArchive dependencies used for integration tests zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 2a842c159..989c0da44 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -506,15 +506,13 @@ public void updateResourceInStateIndex( getResourceByWorkflowStep(workflowStepName), resourceId ); - XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); - newResource.toXContent(builder, ToXContent.EMPTY_PARAMS); // The script to append a new object to the resources_created array Script script = new Script( ScriptType.INLINE, "painless", "ctx._source.resources_created.add(params.newResource)", - Collections.singletonMap("newResource", newResource) + Collections.singletonMap("newResource", newResource.resourceMap()) ); updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> { diff --git a/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java b/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java index c402b9e06..1d57fb148 100644 --- a/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java +++ b/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java @@ -13,15 +13,13 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.flowframework.exception.FlowFrameworkException; import java.io.IOException; +import java.util.Map; -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID; import static org.opensearch.flowframework.common.CommonValue.RESOURCE_TYPE; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID; @@ -34,10 +32,7 @@ public class ResourceCreated implements ToXContentObject, Writeable { private static final Logger logger = LogManager.getLogger(ResourceCreated.class); - private final String workflowStepName; - private final String workflowStepId; - private final String resourceType; - private final String resourceId; + private final Map resourceMap; /** * Create this resources created object with given workflow step name, ID and resource ID. @@ -47,10 +42,14 @@ public class ResourceCreated implements ToXContentObject, Writeable { * @param resourceId The resources ID for relating to the created resource */ public ResourceCreated(String workflowStepName, String workflowStepId, String resourceType, String resourceId) { - this.workflowStepName = workflowStepName; - this.workflowStepId = workflowStepId; - this.resourceType = resourceType; - this.resourceId = resourceId; + this( + Map.ofEntries( + Map.entry(WORKFLOW_STEP_NAME, workflowStepName), + Map.entry(WORKFLOW_STEP_ID, workflowStepId), + Map.entry(RESOURCE_TYPE, resourceType), + Map.entry(RESOURCE_ID, resourceId) + ) + ); } /** @@ -59,28 +58,21 @@ public ResourceCreated(String workflowStepName, String workflowStepId, String re * @throws IOException if failed to read input stream */ public ResourceCreated(StreamInput input) throws IOException { - this.workflowStepName = input.readString(); - this.workflowStepId = input.readString(); - this.resourceType = input.readString(); - this.resourceId = input.readString(); + this(input.readMap(StreamInput::readString, StreamInput::readString)); + } + + private ResourceCreated(Map map) { + this.resourceMap = Map.copyOf(map); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - XContentBuilder xContentBuilder = builder.startObject() - .field(WORKFLOW_STEP_NAME, workflowStepName) - .field(WORKFLOW_STEP_ID, workflowStepId) - .field(RESOURCE_TYPE, resourceType) - .field(RESOURCE_ID, resourceId); - return xContentBuilder.endObject(); + return builder.map(resourceMap); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(workflowStepName); - out.writeString(workflowStepId); - out.writeString(resourceType); - out.writeString(resourceId); + out.writeMap(resourceMap, StreamOutput::writeString, StreamOutput::writeString); } /** @@ -89,7 +81,7 @@ public void writeTo(StreamOutput out) throws IOException { * @return the resourceId. */ public String resourceId() { - return resourceId; + return resourceMap.get(RESOURCE_ID); } /** @@ -98,7 +90,7 @@ public String resourceId() { * @return the resource type. */ public String resourceType() { - return resourceType; + return resourceMap.get(RESOURCE_TYPE); } /** @@ -107,7 +99,7 @@ public String resourceType() { * @return the workflowStepName. */ public String workflowStepName() { - return workflowStepName; + return resourceMap.get(WORKFLOW_STEP_NAME); } /** @@ -116,7 +108,16 @@ public String workflowStepName() { * @return the workflowStepId. */ public String workflowStepId() { - return workflowStepId; + return resourceMap.get(WORKFLOW_STEP_ID); + } + + /** + * Gets the map of resource values + * + * @return a map with the resource values + */ + public Map resourceMap() { + return resourceMap; } /** @@ -127,62 +128,11 @@ public String workflowStepId() { * @throws IOException if content can't be parsed correctly */ public static ResourceCreated parse(XContentParser parser) throws IOException { - String workflowStepName = null; - String workflowStepId = null; - String resourceType = null; - String resourceId = null; - - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - - switch (fieldName) { - case WORKFLOW_STEP_NAME: - workflowStepName = parser.text(); - break; - case WORKFLOW_STEP_ID: - workflowStepId = parser.text(); - break; - case RESOURCE_TYPE: - resourceType = parser.text(); - break; - case RESOURCE_ID: - resourceId = parser.text(); - break; - default: - throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object."); - } - } - if (workflowStepName == null) { - logger.error("Resource created object failed parsing: workflowStepName: {}", workflowStepName); - throw new FlowFrameworkException("A ResourceCreated object requires workflowStepName", RestStatus.BAD_REQUEST); - } - if (workflowStepId == null) { - logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId); - throw new FlowFrameworkException("A ResourceCreated object requires workflowStepId", RestStatus.BAD_REQUEST); - } - if (resourceType == null) { - logger.error("Resource created object failed parsing: resourceType: {}", resourceType); - throw new FlowFrameworkException("A ResourceCreated object requires resourceType", RestStatus.BAD_REQUEST); - } - if (resourceId == null) { - logger.error("Resource created object failed parsing: resourceId: {}", resourceId); - throw new FlowFrameworkException("A ResourceCreated object requires resourceId", RestStatus.BAD_REQUEST); - } - return new ResourceCreated(workflowStepName, workflowStepId, resourceType, resourceId); + return new ResourceCreated(parser.mapStrings()); } @Override public String toString() { - return "resources_Created [workflow_step_name=" - + workflowStepName - + ", workflow_step_id=" - + workflowStepId - + ", resource_type=" - + resourceType - + ", resource_id=" - + resourceId - + "]"; + return "resources_Created [" + resourceMap + "]"; } } diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java index e962f5758..ca1fc8855 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java @@ -97,7 +97,7 @@ public WorkflowState( private WorkflowState() {} /** - * Instatiates a new WorkflowState from an input stream + * Instantiates a new WorkflowState from an input stream * @param input the input stream to read from * @throws IOException if the workflowId cannot be read from the input stream */ @@ -108,10 +108,14 @@ public WorkflowState(StreamInput input) throws IOException { this.provisioningProgress = input.readOptionalString(); this.provisionStartTime = input.readOptionalInstant(); this.provisionEndTime = input.readOptionalInstant(); - // TODO: fix error: cannot access Response issue when integrating with access control - // this.user = input.readBoolean() ? new User(input) : null; + this.user = input.readBoolean() ? new User(input) : null; this.userOutputs = input.readBoolean() ? input.readMap() : null; - this.resourcesCreated = input.readList(ResourceCreated::new); + + int resourceCount = input.readVInt(); + this.resourcesCreated = new ArrayList<>(resourceCount); + for (int r = 0; r < resourceCount; r++) { + resourcesCreated.add(new ResourceCreated(input)); + } } /** @@ -293,6 +297,7 @@ public void writeTo(StreamOutput output) throws IOException { output.writeOptionalInstant(provisionEndTime); if (user != null) { + output.writeBoolean(true); user.writeTo(output); } else { output.writeBoolean(false); @@ -304,7 +309,11 @@ public void writeTo(StreamOutput output) throws IOException { } else { output.writeBoolean(false); } - output.writeList(resourcesCreated); + + output.writeVInt(resourcesCreated.size()); + for (ResourceCreated resource : resourcesCreated) { + resource.writeTo(output); + } } /** diff --git a/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java b/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java index 38607e75b..dadfd6d24 100644 --- a/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java +++ b/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java @@ -8,12 +8,10 @@ */ package org.opensearch.flowframework.model; -import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; -import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.CREATE_CONNECTOR; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; @@ -30,34 +28,18 @@ public void testParseFeature() throws IOException { ResourceCreated resourceCreated = new ResourceCreated(workflowStepName, "workflow_step_1", resourceType, "L85p1IsBbfF"); assertEquals(workflowStepName, resourceCreated.workflowStepName()); assertEquals("workflow_step_1", resourceCreated.workflowStepId()); - assertEquals(CONNECTOR_ID, resourceCreated.resourceType()); + assertEquals("connector_id", resourceCreated.resourceType()); assertEquals("L85p1IsBbfF", resourceCreated.resourceId()); - String expectedJson = - "{\"workflow_step_name\":\"create_connector\",\"workflow_step_id\":\"workflow_step_1\",\"resource_type\":\"connector_id\",\"resource_id\":\"L85p1IsBbfF\"}"; String json = TemplateTestJsonUtil.parseToJson(resourceCreated); - assertEquals(expectedJson, json); + assertTrue(json.contains("\"workflow_step_name\":\"create_connector\"")); + assertTrue(json.contains("\"workflow_step_id\":\"workflow_step_1\"")); + assertTrue(json.contains("\"resource_type\":\"connector_id\"")); + assertTrue(json.contains("\"resource_id\":\"L85p1IsBbfF\"")); ResourceCreated resourceCreatedTwo = ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(json)); assertEquals(workflowStepName, resourceCreatedTwo.workflowStepName()); assertEquals("workflow_step_1", resourceCreatedTwo.workflowStepId()); assertEquals("L85p1IsBbfF", resourceCreatedTwo.resourceId()); } - - public void testExceptions() throws IOException { - String badJson = "{\"wrong\":\"A\",\"resource_id\":\"B\"}"; - IOException badJsonException = assertThrows( - IOException.class, - () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(badJson)) - ); - assertEquals("Unable to parse field [wrong] in a resources_created object.", badJsonException.getMessage()); - - String missingJson = "{\"resource_id\":\"B\"}"; - FlowFrameworkException missingJsonException = assertThrows( - FlowFrameworkException.class, - () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(missingJson)) - ); - assertEquals("A ResourceCreated object requires workflowStepName", missingJsonException.getMessage()); - } - } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java new file mode 100644 index 000000000..04c3655d3 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class WorkflowStateTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testWorkflowState() throws IOException { + String workflowId = "id"; + String error = "error"; + String state = "state"; + String provisioningProgress = "progress"; + Instant provisionStartTime = Instant.now().minusSeconds(2); + Instant provisionEndTime = Instant.now(); + User user = new User("user", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + Map userOutputs = Map.of("foo", Map.of("bar", "baz")); + List resourcesCreated = List.of(new ResourceCreated("name", "stepId", "type", "id")); + + WorkflowState wfs = WorkflowState.builder() + .workflowId(workflowId) + .error(error) + .state(state) + .provisioningProgress(provisioningProgress) + .provisionStartTime(provisionStartTime) + .provisionEndTime(provisionEndTime) + .user(user) + .userOutputs(userOutputs) + .resourcesCreated(resourcesCreated) + .build(); + + assertEquals(workflowId, wfs.getWorkflowId()); + assertEquals(error, wfs.getError()); + assertEquals(state, wfs.getState()); + assertEquals(provisioningProgress, wfs.getProvisioningProgress()); + assertEquals(provisionStartTime, wfs.getProvisionStartTime()); + assertEquals(provisionEndTime, wfs.getProvisionEndTime()); + assertEquals("user", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("baz", ((Map) wfs.userOutputs().get("foo")).get("bar")); + assertEquals(1, wfs.resourcesCreated().size()); + ResourceCreated rc = wfs.resourcesCreated().get(0); + assertEquals("name", rc.workflowStepName()); + assertEquals("stepId", rc.workflowStepId()); + assertEquals("type", rc.resourceType()); + assertEquals("id", rc.resourceId()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + wfs.writeTo(out); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + wfs = new WorkflowState(in); + + assertEquals(workflowId, wfs.getWorkflowId()); + assertEquals(error, wfs.getError()); + assertEquals(state, wfs.getState()); + assertEquals(provisioningProgress, wfs.getProvisioningProgress()); + assertEquals(provisionStartTime, wfs.getProvisionStartTime()); + assertEquals(provisionEndTime, wfs.getProvisionEndTime()); + assertEquals("user", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("baz", ((Map) wfs.userOutputs().get("foo")).get("bar")); + assertEquals(1, wfs.resourcesCreated().size()); + rc = wfs.resourcesCreated().get(0); + assertEquals("name", rc.workflowStepName()); + assertEquals("stepId", rc.workflowStepId()); + assertEquals("type", rc.resourceType()); + assertEquals("id", rc.resourceId()); + } + } + } + +}