Skip to content

Commit

Permalink
[Backport 2.x] Add WorkflowStep Factory and implement XContent-based …
Browse files Browse the repository at this point in the history
…Template Parsing (#60)

Add WorkflowStep Factory and implement XContent-based Template Parsing (#47)

* Add WorkflowStepFactory class



* Add XContent classes representing Template JSON



* Add parse methods for the Template XContent



* Cleanup parsing, javadocs, and demo output



* Refactor to use field name constants, get tests working again



* Separate WorkflowNode and ProcessNode functionality



* Fix demos to align with template field names



* Add workflow, node, and edge tests



* Add Template tests



* Refactor TemplateParser to WorkflowProcessSorter



* Test exceptional cases



* Finish up exceptional cases



* Fix a template field name bug in demo



* Rebase with #34



* Rebase changes from #54



* Integrate thread pool executor service



* Fix flaky ProcessNodeTests by removing orTimeout



* Rebase and refactor with #44



* Fix demos and remove DataDemo



* Use non-deprecated mapping method for CreateIndexStep



* Eliminate casting and deprecation warnings on test classes



* Remove unused/leftover demo class



* Typo



* Don't offer steps as an alternative to nodes



* Move Workflow into package with all the other parsing classes



* Move process sequencing classes into workflow package



* Add PipelineProcessor class and XContent parsing, rename package



---------


(cherry picked from commit 734f9c2)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 6f3b720 commit 5161294
Show file tree
Hide file tree
Showing 33 changed files with 2,099 additions and 748 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
implementation "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
78 changes: 0 additions & 78 deletions src/main/java/demo/CreateIndexWorkflowStep.java

This file was deleted.

85 changes: 0 additions & 85 deletions src/main/java/demo/DataDemo.java

This file was deleted.

41 changes: 19 additions & 22 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,41 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

// This is temporary. We need a factory class to generate these workflow steps
// based on a field in the JSON.
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
static {
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
}

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on a failure
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/demo.json";
String json;
try {
Expand All @@ -60,13 +53,18 @@ public static void main(String[] args) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
List<ProcessNode> predecessors = n.predecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
Expand All @@ -78,11 +76,10 @@ public static void main(String[] args) {
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
}

}
65 changes: 65 additions & 0 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class TemplateParseDemo {

private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class);

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on error.
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/finaltemplate.json";
String json;
try {
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
} catch (IOException e) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));

Template t = Template.parse(json);

System.out.println(t.toJson());
System.out.println(t.toYaml());

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand All @@ -32,8 +32,6 @@
*/
public class FlowFrameworkPlugin extends Plugin {

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -48,9 +46,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);
CreateIndexStep createIndexStep = new CreateIndexStep(client);
return ImmutableList.of(createIngestPipelineStep, createIndexStep);
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
}
}
Loading

0 comments on commit 5161294

Please sign in to comment.