Skip to content

Commit

Permalink
Add WorkflowStep Factory and implement XContent-based Template Parsing (
Browse files Browse the repository at this point in the history
#47)

* Add WorkflowStepFactory class

Signed-off-by: Daniel Widdis <[email protected]>

* Add XContent classes representing Template JSON

Signed-off-by: Daniel Widdis <[email protected]>

* Add parse methods for the Template XContent

Signed-off-by: Daniel Widdis <[email protected]>

* Cleanup parsing, javadocs, and demo output

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor to use field name constants, get tests working again

Signed-off-by: Daniel Widdis <[email protected]>

* Separate WorkflowNode and ProcessNode functionality

Signed-off-by: Daniel Widdis <[email protected]>

* Fix demos to align with template field names

Signed-off-by: Daniel Widdis <[email protected]>

* Add workflow, node, and edge tests

Signed-off-by: Daniel Widdis <[email protected]>

* Add Template tests

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor TemplateParser to WorkflowProcessSorter

Signed-off-by: Daniel Widdis <[email protected]>

* Test exceptional cases

Signed-off-by: Daniel Widdis <[email protected]>

* Finish up exceptional cases

Signed-off-by: Daniel Widdis <[email protected]>

* Fix a template field name bug in demo

Signed-off-by: Daniel Widdis <[email protected]>

* Rebase with #34

Signed-off-by: Daniel Widdis <[email protected]>

* Rebase changes from #54

Signed-off-by: Daniel Widdis <[email protected]>

* Integrate thread pool executor service

Signed-off-by: Daniel Widdis <[email protected]>

* Fix flaky ProcessNodeTests by removing orTimeout

Signed-off-by: Daniel Widdis <[email protected]>

* Rebase and refactor with #44

Signed-off-by: Daniel Widdis <[email protected]>

* Fix demos and remove DataDemo

Signed-off-by: Daniel Widdis <[email protected]>

* Use non-deprecated mapping method for CreateIndexStep

Signed-off-by: Daniel Widdis <[email protected]>

* Eliminate casting and deprecation warnings on test classes

Signed-off-by: Daniel Widdis <[email protected]>

* Remove unused/leftover demo class

Signed-off-by: Daniel Widdis <[email protected]>

* Typo

Signed-off-by: Daniel Widdis <[email protected]>

* Don't offer steps as an alternative to nodes

Signed-off-by: Daniel Widdis <[email protected]>

* Move Workflow into package with all the other parsing classes

Signed-off-by: Daniel Widdis <[email protected]>

* Move process sequencing classes into workflow package

Signed-off-by: Daniel Widdis <[email protected]>

* Add PipelineProcessor class and XContent parsing, rename package

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit 734f9c2)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 28, 2023
1 parent 6f3b720 commit a629cba
Show file tree
Hide file tree
Showing 33 changed files with 2,099 additions and 748 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
implementation "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
78 changes: 0 additions & 78 deletions src/main/java/demo/CreateIndexWorkflowStep.java

This file was deleted.

85 changes: 0 additions & 85 deletions src/main/java/demo/DataDemo.java

This file was deleted.

41 changes: 19 additions & 22 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,41 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

// This is temporary. We need a factory class to generate these workflow steps
// based on a field in the JSON.
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
static {
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
}

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on a failure
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/demo.json";
String json;
try {
Expand All @@ -60,13 +53,18 @@ public static void main(String[] args) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
List<ProcessNode> predecessors = n.predecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
Expand All @@ -78,11 +76,10 @@ public static void main(String[] args) {
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
}

}
65 changes: 65 additions & 0 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class TemplateParseDemo {

private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class);

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on error.
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/finaltemplate.json";
String json;
try {
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
} catch (IOException e) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));

Template t = Template.parse(json);

System.out.println(t.toJson());
System.out.println(t.toYaml());

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand All @@ -32,8 +32,6 @@
*/
public class FlowFrameworkPlugin extends Plugin {

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -48,9 +46,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);
CreateIndexStep createIndexStep = new CreateIndexStep(client);
return ImmutableList.of(createIngestPipelineStep, createIndexStep);
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());

Check warning on line 50 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L49-L50

Added lines #L49 - L50 were not covered by tests

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);

Check warning on line 52 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L52

Added line #L52 was not covered by tests
}
}
Loading

0 comments on commit a629cba

Please sign in to comment.