-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Topological Sorting and Sequenced Execution (#26)
* Topological Sorting and Sequenced Execution Signed-off-by: Daniel Widdis <[email protected]> * Add javadocs Signed-off-by: Daniel Widdis <[email protected]> * Update demo to link to Workflow interface Signed-off-by: Daniel Widdis <[email protected]> * Replace System.out with logging Signed-off-by: Daniel Widdis <[email protected]> * Update with new interface signatures Signed-off-by: Daniel Widdis <[email protected]> * Demo passing input data at parse-time Signed-off-by: Daniel Widdis <[email protected]> * Demo passing data in between steps Signed-off-by: Daniel Widdis <[email protected]> * Change execute arg to list and refactor demo classes to own package Signed-off-by: Daniel Widdis <[email protected]> * Significantly simplify input/output data passing Signed-off-by: Daniel Widdis <[email protected]> * Add tests Signed-off-by: Daniel Widdis <[email protected]> * Fix javadocs and forbidden API issues Signed-off-by: Daniel Widdis <[email protected]> * Address code review comments Signed-off-by: Daniel Widdis <[email protected]> --------- Signed-off-by: Daniel Widdis <[email protected]> (cherry picked from commit a574f47) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
58db87b
commit a509214
Showing
21 changed files
with
1,133 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package demo; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.opensearch.flowframework.workflow.WorkflowData; | ||
import org.opensearch.flowframework.workflow.WorkflowStep; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* Sample to show other devs how to pass data around. Will be deleted once other PRs are merged. | ||
*/ | ||
public class CreateIndexWorkflowStep implements WorkflowStep { | ||
|
||
private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); | ||
|
||
private final String name; | ||
|
||
/** | ||
* Instantiate this class. | ||
*/ | ||
public CreateIndexWorkflowStep() { | ||
this.name = "CREATE_INDEX"; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) { | ||
CompletableFuture<WorkflowData> future = new CompletableFuture<>(); | ||
// TODO we will be passing a thread pool to this object when it's instantiated | ||
// we should either add the generic executor from that pool to this call | ||
// or use executorservice.submit or any of various threading options | ||
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42 | ||
CompletableFuture.runAsync(() -> { | ||
String inputIndex = null; | ||
boolean first = true; | ||
for (WorkflowData wfData : data) { | ||
logger.debug( | ||
"{} sent params: {}, content: {}", | ||
first ? "Initialization" : "Previous step", | ||
wfData.getParams(), | ||
wfData.getContent() | ||
); | ||
if (first) { | ||
Map<String, String> params = data.get(0).getParams(); | ||
if (params.containsKey("index")) { | ||
inputIndex = params.get("index"); | ||
} | ||
first = false; | ||
} | ||
} | ||
// do some work, simulating a REST API call | ||
try { | ||
Thread.sleep(2000); | ||
} catch (InterruptedException e) {} | ||
// Simulate response of created index | ||
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex); | ||
future.complete(new WorkflowData() { | ||
@Override | ||
public Map<String, Object> getContent() { | ||
return Map.of("index", response.index()); | ||
} | ||
}); | ||
}); | ||
|
||
return future; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return name; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package demo; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.SuppressForbidden; | ||
import org.opensearch.common.io.PathUtils; | ||
import org.opensearch.flowframework.template.ProcessNode; | ||
import org.opensearch.flowframework.template.TemplateParser; | ||
import org.opensearch.flowframework.workflow.WorkflowStep; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test. | ||
*/ | ||
public class DataDemo { | ||
|
||
private static final Logger logger = LogManager.getLogger(DataDemo.class); | ||
|
||
// This is temporary. We need a factory class to generate these workflow steps | ||
// based on a field in the JSON. | ||
private static Map<String, WorkflowStep> workflowMap = new HashMap<>(); | ||
static { | ||
workflowMap.put("create_index", new CreateIndexWorkflowStep()); | ||
workflowMap.put("create_another_index", new CreateIndexWorkflowStep()); | ||
} | ||
|
||
/** | ||
* Demonstrate parsing a JSON graph. | ||
* | ||
* @param args unused | ||
*/ | ||
@SuppressForbidden(reason = "just a demo class that will be deleted") | ||
public static void main(String[] args) { | ||
String path = "src/test/resources/template/datademo.json"; | ||
String json; | ||
try { | ||
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); | ||
} catch (IOException e) { | ||
logger.error("Failed to read JSON at path {}", path); | ||
return; | ||
} | ||
|
||
logger.info("Parsing graph to sequence..."); | ||
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap); | ||
List<CompletableFuture<?>> futureList = new ArrayList<>(); | ||
|
||
for (ProcessNode n : processSequence) { | ||
Set<ProcessNode> predecessors = n.getPredecessors(); | ||
logger.info( | ||
"Queueing process [{}].{}", | ||
n.id(), | ||
predecessors.isEmpty() | ||
? " Can start immediately!" | ||
: String.format( | ||
Locale.getDefault(), | ||
" Must wait for [%s] to complete first.", | ||
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) | ||
) | ||
); | ||
futureList.add(n.execute()); | ||
} | ||
futureList.forEach(CompletableFuture::join); | ||
logger.info("All done!"); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package demo; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.SuppressForbidden; | ||
import org.opensearch.common.io.PathUtils; | ||
import org.opensearch.flowframework.template.ProcessNode; | ||
import org.opensearch.flowframework.template.TemplateParser; | ||
import org.opensearch.flowframework.workflow.WorkflowStep; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test. | ||
*/ | ||
public class Demo { | ||
|
||
private static final Logger logger = LogManager.getLogger(Demo.class); | ||
|
||
// This is temporary. We need a factory class to generate these workflow steps | ||
// based on a field in the JSON. | ||
private static Map<String, WorkflowStep> workflowMap = new HashMap<>(); | ||
static { | ||
workflowMap.put("fetch_model", new DemoWorkflowStep(3000)); | ||
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000)); | ||
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000)); | ||
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000)); | ||
} | ||
|
||
/** | ||
* Demonstrate parsing a JSON graph. | ||
* | ||
* @param args unused | ||
*/ | ||
@SuppressForbidden(reason = "just a demo class that will be deleted") | ||
public static void main(String[] args) { | ||
String path = "src/test/resources/template/demo.json"; | ||
String json; | ||
try { | ||
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); | ||
} catch (IOException e) { | ||
logger.error("Failed to read JSON at path {}", path); | ||
return; | ||
} | ||
|
||
logger.info("Parsing graph to sequence..."); | ||
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap); | ||
List<CompletableFuture<?>> futureList = new ArrayList<>(); | ||
|
||
for (ProcessNode n : processSequence) { | ||
Set<ProcessNode> predecessors = n.getPredecessors(); | ||
logger.info( | ||
"Queueing process [{}].{}", | ||
n.id(), | ||
predecessors.isEmpty() | ||
? " Can start immediately!" | ||
: String.format( | ||
Locale.getDefault(), | ||
" Must wait for [%s] to complete first.", | ||
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) | ||
) | ||
); | ||
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly | ||
futureList.add(n.execute()); | ||
} | ||
futureList.forEach(CompletableFuture::join); | ||
logger.info("All done!"); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package demo; | ||
|
||
import org.opensearch.flowframework.workflow.WorkflowData; | ||
import org.opensearch.flowframework.workflow.WorkflowStep; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* Demo workflowstep to show sequenced execution | ||
*/ | ||
public class DemoWorkflowStep implements WorkflowStep { | ||
|
||
private final long delay; | ||
private final String name; | ||
|
||
/** | ||
* Instantiate a step with a delay. | ||
* @param delay milliseconds to take pretending to do work while really sleeping | ||
*/ | ||
public DemoWorkflowStep(long delay) { | ||
this.delay = delay; | ||
this.name = "DEMO_DELAY_" + delay; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) { | ||
CompletableFuture<WorkflowData> future = new CompletableFuture<>(); | ||
CompletableFuture.runAsync(() -> { | ||
try { | ||
Thread.sleep(this.delay); | ||
future.complete(null); | ||
} catch (InterruptedException e) { | ||
future.completeExceptionally(e); | ||
} | ||
}); | ||
return future; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return name; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
|
||
DO NOT DEPEND ON CLASSES IN THIS PACKAGE. | ||
|
||
The contents of this folder are for demo/proof-of-concept use. | ||
|
||
Feel free to look at the classes in this folder for potential "how could I" scenarios. | ||
|
||
Tests will not be written against them. | ||
Documentation may be incomplete, wrong, or outdated. | ||
These are not for production use. | ||
They will be deleted without notice at some point, and altered without notice at other points. | ||
|
||
DO NOT DEPEND ON CLASSES IN THIS PACKAGE. |
Oops, something went wrong.