Skip to content

Commit

Permalink
Iterate collection for unregistered Writeable in streams (#359)
Browse files Browse the repository at this point in the history
* Iterate collection for unregistered Writeable in streams

Signed-off-by: Daniel Widdis <[email protected]>

* Add dependencies so User class compiles

Signed-off-by: Daniel Widdis <[email protected]>

* Make ResourceCreated writable for Script class

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit 8216cb9)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jan 4, 2024
1 parent f7d96e0 commit 7eb416a
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 113 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter:5.10.1'
implementation "com.google.guava:guava:33.0.0-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0'
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.77'
api "org.apache.httpcomponents.core5:httpcore5:5.2.2"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,13 @@ public void updateResourceInStateIndex(
getResourceByWorkflowStep(workflowStepName),
resourceId
);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
newResource.toXContent(builder, ToXContent.EMPTY_PARAMS);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
Collections.singletonMap("newResource", newResource)
Collections.singletonMap("newResource", newResource.resourceMap())

Check warning on line 515 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L515

Added line #L515 was not covered by tests
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
Expand Down
114 changes: 32 additions & 82 deletions src/main/java/org/opensearch/flowframework/model/ResourceCreated.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.io.IOException;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_TYPE;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID;
Expand All @@ -34,10 +32,7 @@ public class ResourceCreated implements ToXContentObject, Writeable {

private static final Logger logger = LogManager.getLogger(ResourceCreated.class);

private final String workflowStepName;
private final String workflowStepId;
private final String resourceType;
private final String resourceId;
private final Map<String, String> resourceMap;

/**
* Create this resources created object with given workflow step name, ID and resource ID.
Expand All @@ -47,10 +42,14 @@ public class ResourceCreated implements ToXContentObject, Writeable {
* @param resourceId The resources ID for relating to the created resource
*/
public ResourceCreated(String workflowStepName, String workflowStepId, String resourceType, String resourceId) {
this.workflowStepName = workflowStepName;
this.workflowStepId = workflowStepId;
this.resourceType = resourceType;
this.resourceId = resourceId;
this(
Map.ofEntries(
Map.entry(WORKFLOW_STEP_NAME, workflowStepName),
Map.entry(WORKFLOW_STEP_ID, workflowStepId),
Map.entry(RESOURCE_TYPE, resourceType),
Map.entry(RESOURCE_ID, resourceId)
)
);
}

/**
Expand All @@ -59,28 +58,21 @@ public ResourceCreated(String workflowStepName, String workflowStepId, String re
* @throws IOException if failed to read input stream
*/
public ResourceCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.workflowStepId = input.readString();
this.resourceType = input.readString();
this.resourceId = input.readString();
this(input.readMap(StreamInput::readString, StreamInput::readString));
}

private ResourceCreated(Map<String, String> map) {
this.resourceMap = Map.copyOf(map);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(WORKFLOW_STEP_ID, workflowStepId)
.field(RESOURCE_TYPE, resourceType)
.field(RESOURCE_ID, resourceId);
return xContentBuilder.endObject();
return builder.map(resourceMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(workflowStepId);
out.writeString(resourceType);
out.writeString(resourceId);
out.writeMap(resourceMap, StreamOutput::writeString, StreamOutput::writeString);
}

/**
Expand All @@ -89,7 +81,7 @@ public void writeTo(StreamOutput out) throws IOException {
* @return the resourceId.
*/
public String resourceId() {
return resourceId;
return resourceMap.get(RESOURCE_ID);
}

/**
Expand All @@ -98,7 +90,7 @@ public String resourceId() {
* @return the resource type.
*/
public String resourceType() {
return resourceType;
return resourceMap.get(RESOURCE_TYPE);
}

/**
Expand All @@ -107,7 +99,7 @@ public String resourceType() {
* @return the workflowStepName.
*/
public String workflowStepName() {
return workflowStepName;
return resourceMap.get(WORKFLOW_STEP_NAME);
}

/**
Expand All @@ -116,7 +108,16 @@ public String workflowStepName() {
* @return the workflowStepId.
*/
public String workflowStepId() {
return workflowStepId;
return resourceMap.get(WORKFLOW_STEP_ID);
}

/**
* Gets the map of resource values
*
* @return a map with the resource values
*/
public Map<String, String> resourceMap() {
return resourceMap;

Check warning on line 120 in src/main/java/org/opensearch/flowframework/model/ResourceCreated.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/ResourceCreated.java#L120

Added line #L120 was not covered by tests
}

/**
Expand All @@ -127,62 +128,11 @@ public String workflowStepId() {
* @throws IOException if content can't be parsed correctly
*/
public static ResourceCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String workflowStepId = null;
String resourceType = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case WORKFLOW_STEP_ID:
workflowStepId = parser.text();
break;
case RESOURCE_TYPE:
resourceType = parser.text();
break;
case RESOURCE_ID:
resourceId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
}
}
if (workflowStepName == null) {
logger.error("Resource created object failed parsing: workflowStepName: {}", workflowStepName);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepName", RestStatus.BAD_REQUEST);
}
if (workflowStepId == null) {
logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepId", RestStatus.BAD_REQUEST);
}
if (resourceType == null) {
logger.error("Resource created object failed parsing: resourceType: {}", resourceType);
throw new FlowFrameworkException("A ResourceCreated object requires resourceType", RestStatus.BAD_REQUEST);
}
if (resourceId == null) {
logger.error("Resource created object failed parsing: resourceId: {}", resourceId);
throw new FlowFrameworkException("A ResourceCreated object requires resourceId", RestStatus.BAD_REQUEST);
}
return new ResourceCreated(workflowStepName, workflowStepId, resourceType, resourceId);
return new ResourceCreated(parser.mapStrings());
}

@Override
public String toString() {
return "resources_Created [workflow_step_name="
+ workflowStepName
+ ", workflow_step_id="
+ workflowStepId
+ ", resource_type="
+ resourceType
+ ", resource_id="
+ resourceId
+ "]";
return "resources_Created [" + resourceMap + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public WorkflowState(
private WorkflowState() {}

/**
* Instatiates a new WorkflowState from an input stream
* Instantiates a new WorkflowState from an input stream
* @param input the input stream to read from
* @throws IOException if the workflowId cannot be read from the input stream
*/
Expand All @@ -108,10 +108,14 @@ public WorkflowState(StreamInput input) throws IOException {
this.provisioningProgress = input.readOptionalString();
this.provisionStartTime = input.readOptionalInstant();
this.provisionEndTime = input.readOptionalInstant();
// TODO: fix error: cannot access Response issue when integrating with access control
// this.user = input.readBoolean() ? new User(input) : null;
this.user = input.readBoolean() ? new User(input) : null;
this.userOutputs = input.readBoolean() ? input.readMap() : null;
this.resourcesCreated = input.readList(ResourceCreated::new);

int resourceCount = input.readVInt();
this.resourcesCreated = new ArrayList<>(resourceCount);
for (int r = 0; r < resourceCount; r++) {
resourcesCreated.add(new ResourceCreated(input));
}
}

/**
Expand Down Expand Up @@ -293,6 +297,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInstant(provisionEndTime);

if (user != null) {
output.writeBoolean(true);
user.writeTo(output);
} else {
output.writeBoolean(false);
Expand All @@ -304,7 +309,11 @@ public void writeTo(StreamOutput output) throws IOException {
} else {
output.writeBoolean(false);
}
output.writeList(resourcesCreated);

output.writeVInt(resourcesCreated.size());
for (ResourceCreated resource : resourcesCreated) {
resource.writeTo(output);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
*/
package org.opensearch.flowframework.model;

import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CREATE_CONNECTOR;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

Expand All @@ -30,34 +28,18 @@ public void testParseFeature() throws IOException {
ResourceCreated resourceCreated = new ResourceCreated(workflowStepName, "workflow_step_1", resourceType, "L85p1IsBbfF");
assertEquals(workflowStepName, resourceCreated.workflowStepName());
assertEquals("workflow_step_1", resourceCreated.workflowStepId());
assertEquals(CONNECTOR_ID, resourceCreated.resourceType());
assertEquals("connector_id", resourceCreated.resourceType());
assertEquals("L85p1IsBbfF", resourceCreated.resourceId());

String expectedJson =
"{\"workflow_step_name\":\"create_connector\",\"workflow_step_id\":\"workflow_step_1\",\"resource_type\":\"connector_id\",\"resource_id\":\"L85p1IsBbfF\"}";
String json = TemplateTestJsonUtil.parseToJson(resourceCreated);
assertEquals(expectedJson, json);
assertTrue(json.contains("\"workflow_step_name\":\"create_connector\""));
assertTrue(json.contains("\"workflow_step_id\":\"workflow_step_1\""));
assertTrue(json.contains("\"resource_type\":\"connector_id\""));
assertTrue(json.contains("\"resource_id\":\"L85p1IsBbfF\""));

ResourceCreated resourceCreatedTwo = ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(json));
assertEquals(workflowStepName, resourceCreatedTwo.workflowStepName());
assertEquals("workflow_step_1", resourceCreatedTwo.workflowStepId());
assertEquals("L85p1IsBbfF", resourceCreatedTwo.resourceId());
}

public void testExceptions() throws IOException {
String badJson = "{\"wrong\":\"A\",\"resource_id\":\"B\"}";
IOException badJsonException = assertThrows(
IOException.class,
() -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(badJson))
);
assertEquals("Unable to parse field [wrong] in a resources_created object.", badJsonException.getMessage());

String missingJson = "{\"resource_id\":\"B\"}";
FlowFrameworkException missingJsonException = assertThrows(
FlowFrameworkException.class,
() -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(missingJson))
);
assertEquals("A ResourceCreated object requires workflowStepName", missingJsonException.getMessage());
}

}
Loading

0 comments on commit 7eb416a

Please sign in to comment.