Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkflowStep Factory and implement XContent-based Template Parsing #47

Merged
merged 27 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
24b3542
Add WorkflowStepFactory class
dbwiddis Sep 19, 2023
e70b9e8
Add XContent classes representing Template JSON
dbwiddis Sep 20, 2023
677cc24
Add parse methods for the Template XContent
dbwiddis Sep 21, 2023
57fa740
Cleanup parsing, javadocs, and demo output
dbwiddis Sep 21, 2023
1214ebc
Refactor to use field name constants, get tests working again
dbwiddis Sep 21, 2023
fd72b2d
Separate WorkflowNode and ProcessNode functionality
dbwiddis Sep 22, 2023
f2441b3
Fix demos to align with template field names
dbwiddis Sep 22, 2023
c7a2371
Add workflow, node, and edge tests
dbwiddis Sep 22, 2023
388741c
Add Template tests
dbwiddis Sep 22, 2023
df4a0d8
Refactor TemplateParser to WorkflowProcessSorter
dbwiddis Sep 22, 2023
8589cfa
Test exceptional cases
dbwiddis Sep 23, 2023
309d059
Finish up exceptional cases
dbwiddis Sep 23, 2023
2804a52
Fix a template field name bug in demo
dbwiddis Sep 25, 2023
d846cb9
Rebase with #34
dbwiddis Sep 25, 2023
078059b
Rebase changes from #54
dbwiddis Sep 25, 2023
90e95e3
Integrate thread pool executor service
dbwiddis Sep 26, 2023
3fa4d0a
Fix flaky ProcessNodeTests by removing orTimeout
dbwiddis Sep 26, 2023
62fa53c
Rebase and refactor with #44
dbwiddis Sep 26, 2023
6c86dd1
Fix demos and remove DataDemo
dbwiddis Sep 26, 2023
c11a5bf
Use non-deprecated mapping method for CreateIndexStep
dbwiddis Sep 26, 2023
adc349a
Eliminate casting and deprecation warnings on test classes
dbwiddis Sep 26, 2023
21b530b
Remove unused/leftover demo class
dbwiddis Sep 26, 2023
8d28308
Typo
dbwiddis Sep 26, 2023
555d88b
Don't offer steps as an alternative to nodes
dbwiddis Sep 26, 2023
5c78046
Move Workflow into package with all the other parsing classes
dbwiddis Sep 26, 2023
87685ff
Move process sequencing classes into workflow package
dbwiddis Sep 26, 2023
2fca71b
Add PipelineProcessor class and XContent parsing, rename package
dbwiddis Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents a processor associated with search and ingest pipelines in the {@link Template}.
*/
public class PipelineProcessor implements ToXContentObject {

/** The type field name for pipeline processors */
public static final String TYPE_FIELD = "type";
/** The params field name for pipeline processors */
public static final String PARAMS_FIELD = "params";

private final String type;
private final Map<String, String> params;

/**
* Create this processor with a type and map of parameters
* @param type the processor type
* @param params a map of params
*/
public PipelineProcessor(String type, Map<String, String> params) {
this.type = type;
this.params = params;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(TYPE_FIELD, this.type);
xContentBuilder.field(PARAMS_FIELD);
Template.buildStringToStringMap(xContentBuilder, this.params);
return xContentBuilder.endObject();
}

/**
* Parse raw json content into a processor instance.
*
* @param parser json based content parser
* @return the parsed PipelineProcessor instance
* @throws IOException if content can't be parsed correctly
*/
public static PipelineProcessor parse(XContentParser parser) throws IOException {
String type = null;
Map<String, String> params = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case TYPE_FIELD:
type = parser.text();
break;
case PARAMS_FIELD:
params = Template.parseStringToStringMap(parser);
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a pipeline processor object.");
}
}
if (type == null) {
throw new IOException("A processor object requires a type field.");
}

return new PipelineProcessor(type, params);
}

/**
* Get the processor type
* @return the type
*/
public String type() {
return type;
}

/**
* Get the processor params
* @return the params
*/
public Map<String, String> params() {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -222,7 +222,7 @@
}
}
if (name == null) {
throw new IOException("An template object requires a name.");

Check warning on line 225 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L225

Added line #L225 was not covered by tests
}

return new Template(name, description, useCase, operations, templateVersion, compatibilityVersion, userInputs, workflows);
Expand Down Expand Up @@ -287,8 +287,8 @@
try {
XContentBuilder builder = JsonXContent.contentBuilder();
return this.toXContent(builder, EMPTY_PARAMS).toString();
} catch (IOException e) {
return "{\"error\": \"couldn't create JSON: " + e.getMessage() + "\"}";

Check warning on line 291 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L290-L291

Added lines #L290 - L291 were not covered by tests
}
}

Expand All @@ -301,8 +301,8 @@
try {
XContentBuilder builder = YamlXContent.contentBuilder();
return this.toXContent(builder, EMPTY_PARAMS).toString();
} catch (IOException e) {
return "error: couldn't create YAML: " + e.getMessage();

Check warning on line 305 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L304-L305

Added lines #L304 - L305 were not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -39,6 +39,8 @@
public static final String TYPE_FIELD = "type";
/** The template field name for node inputs */
public static final String INPUTS_FIELD = "inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down Expand Up @@ -71,10 +73,15 @@
} else if (e.getValue() instanceof Map<?, ?>) {
Template.buildStringToStringMap(xContentBuilder, (Map<?, ?>) e.getValue());
} else if (e.getValue() instanceof Object[]) {
// This assumes an array of maps for "processor" key
xContentBuilder.startArray();
for (Map<?, ?> map : (Map<?, ?>[]) e.getValue()) {
Template.buildStringToStringMap(xContentBuilder, map);
if (PROCESSORS_FIELD.equals(e.getKey())) {
for (PipelineProcessor p : (PipelineProcessor[]) e.getValue()) {
xContentBuilder.value(p);
}
} else {
for (Map<?, ?> map : (Map<?, ?>[]) e.getValue()) {
Template.buildStringToStringMap(xContentBuilder, map);
}
}
xContentBuilder.endArray();
}
Expand Down Expand Up @@ -119,14 +126,22 @@
inputs.put(inputFieldName, Template.parseStringToStringMap(parser));
break;
case START_ARRAY:
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(Template.parseStringToStringMap(parser));
if (PROCESSORS_FIELD.equals(inputFieldName)) {
List<PipelineProcessor> processorList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
processorList.add(PipelineProcessor.parse(parser));
}
inputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
} else {
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(Template.parseStringToStringMap(parser));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
break;
default:
throw new IOException("Unable to parse field [" + inputFieldName + "] in a node object.");

Check warning on line 144 in src/main/java/org/opensearch/flowframework/model/WorkflowNode.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowNode.java#L144

Added line #L144 was not covered by tests
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.template.WorkflowEdge;
import org.opensearch.flowframework.template.WorkflowNode;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -48,7 +48,7 @@
*/
public static synchronized WorkflowProcessSorter create(WorkflowStepFactory workflowStepFactory, Executor executor) {
if (instance != null) {
throw new IllegalStateException("This class was already created.");

Check warning on line 51 in src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java#L51

Added line #L51 was not covered by tests
}
instance = new WorkflowProcessSorter(workflowStepFactory, executor);
return instance;
Expand All @@ -61,9 +61,9 @@
*/
public static synchronized WorkflowProcessSorter get() {
if (instance == null) {
throw new IllegalStateException("This factory has not yet been created.");

Check warning on line 64 in src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java#L64

Added line #L64 was not covered by tests
}
return instance;

Check warning on line 66 in src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java#L66

Added line #L66 was not covered by tests
}

private WorkflowProcessSorter(WorkflowStepFactory workflowStepFactory, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Map;

public class PipelineProcessorTests extends OpenSearchTestCase {

public void testProcessor() throws IOException {
PipelineProcessor processor = new PipelineProcessor("foo", Map.of("bar", "baz"));

assertEquals("foo", processor.type());
assertEquals(Map.of("bar", "baz"), processor.params());

String expectedJson = "{\"type\":\"foo\",\"params\":{\"bar\":\"baz\"}}";
String json = TemplateTestJsonUtil.parseToJson(processor);
assertEquals(expectedJson, json);

PipelineProcessor processorX = PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(json));
assertEquals("foo", processorX.type());
assertEquals(Map.of("bar", "baz"), processorX.params());
}

public void testExceptions() throws IOException {
String badJson = "{\"badField\":\"foo\",\"params\":{\"bar\":\"baz\"}}";
IOException e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(badJson)));
assertEquals("Unable to parse field [badField] in a pipeline processor object.", e.getMessage());

String noTypeJson = "{\"params\":{\"bar\":\"baz\"}}";
e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(noTypeJson)));
assertEquals("A processor object requires a type field.", e.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.Version;
import org.opensearch.test.OpenSearchTestCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -27,7 +27,8 @@ public void testNode() throws IOException {
Map.ofEntries(
Map.entry("foo", "a string"),
Map.entry("bar", Map.of("key", "value")),
Map.entry("baz", new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") })
Map.entry("baz", new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }),
Map.entry("processors", new PipelineProcessor[] { new PipelineProcessor("test-type", Map.of("key2", "value2")) })
)
);
assertEquals("A", nodeA.id());
Expand All @@ -36,6 +37,10 @@ public void testNode() throws IOException {
assertEquals("a string", (String) map.get("foo"));
assertEquals(Map.of("key", "value"), (Map<?, ?>) map.get("bar"));
assertArrayEquals(new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }, (Map<?, ?>[]) map.get("baz"));
PipelineProcessor[] pp = (PipelineProcessor[]) map.get("processors");
assertEquals(1, pp.length);
assertEquals("test-type", pp[0].type());
assertEquals(Map.of("key2", "value2"), pp[0].params());

// node equality is based only on ID
WorkflowNode nodeA2 = new WorkflowNode("A", "a2-type", Map.of("bar", "baz"));
Expand All @@ -49,6 +54,7 @@ public void testNode() throws IOException {
assertTrue(json.contains("\"foo\":\"a string\""));
assertTrue(json.contains("\"baz\":[{\"A\":\"a\"},{\"B\":\"b\"}]"));
assertTrue(json.contains("\"bar\":{\"key\":\"value\"}"));
assertTrue(json.contains("\"processors\":[{\"type\":\"test-type\",\"params\":{\"key2\":\"value2\"}}]"));

WorkflowNode nodeX = WorkflowNode.parse(TemplateTestJsonUtil.jsonToParser(json));
assertEquals("A", nodeX.id());
Expand All @@ -57,6 +63,10 @@ public void testNode() throws IOException {
assertEquals("a string", mapX.get("foo"));
assertEquals(Map.of("key", "value"), mapX.get("bar"));
assertArrayEquals(new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }, (Map<?, ?>[]) map.get("baz"));
PipelineProcessor[] ppX = (PipelineProcessor[]) map.get("processors");
assertEquals(1, ppX.length);
assertEquals("test-type", ppX[0].type());
assertEquals(Map.of("key2", "value2"), ppX[0].params());
}

public void testExceptions() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.OpenSearchTestCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.template.TemplateTestJsonUtil;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.model.TemplateTestJsonUtil;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -24,9 +24,9 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.template.TemplateTestJsonUtil.edge;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.node;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.workflow;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.edge;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.node;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.workflow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
8 changes: 5 additions & 3 deletions src/test/resources/template/finaltemplate.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
"description": "some description",
"processors": [{
"type": "text_embedding",
"model_id": "my-existing-model-id",
"input_field": "text_passage",
"output_field": "text_embedding"
"params": {
"model_id": "my-existing-model-id",
"input_field": "text_passage",
"output_field": "text_embedding"
}
}]
}
}
Expand Down
Loading