Skip to content

Commit

Permalink
Update demo to link to Workflow interface
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 11, 2023
1 parent 9e08097 commit 73a7978
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 26 deletions.
20 changes: 16 additions & 4 deletions src/main/java/org/opensearch/flowframework/template/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
*/
package org.opensearch.flowframework.template;

import org.opensearch.flowframework.workflow.Workflow;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand All @@ -19,6 +23,14 @@
*/
public class Demo {

private static Map<String, Workflow> workflowMap = new HashMap<>();
static {
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(4000));
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(8000));
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
}

/**
* Demonstrate parsing a JSON graph.
*
Expand Down Expand Up @@ -65,19 +77,19 @@ public static void main(String[] args) {
System.out.println(json);

System.out.println("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json);
List<CompletableFuture<String>> futureList = new ArrayList<>();
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
System.out.format(
"Queueing process [%s]. %s.%n",
n.getId(),
n.id(),
predecessors.isEmpty()
? "Can start immediately!"
: String.format(
"Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", "))
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
futureList.add(n.execute());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;

import org.opensearch.flowframework.workflow.Workflow;

import java.util.concurrent.CompletableFuture;

public class DemoWorkflowStep implements Workflow {

private final long delay;

public DemoWorkflowStep(long delay) {
this.delay = delay;
}

@Override
public CompletableFuture<Workflow> execute() throws Exception {
Thread.sleep(this.delay);
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.template;

import org.opensearch.flowframework.workflow.Workflow;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -22,34 +24,47 @@
*/
public class ProcessNode {
private final String id;
private CompletableFuture<String> future = null;
private final Workflow workflow;
private CompletableFuture<?> future = null;

// will be populated during graph parsing
private Set<ProcessNode> predecessors = Collections.emptySet();

/**
* Create this node with a unique id.
* Create this node linked to its executing process.
*
* @param id A string identifying the workflow step
* @param workflow A java class implementing {@link Workflow} to be executed when it's this node's turn.
*/
ProcessNode(String id) {
ProcessNode(String id, Workflow workflow) {
this.id = id;
this.workflow = workflow;
}

/**
* Returns the node's id.
* @return the node id.
*/
public String getId() {
public String id() {
return id;
}

/**
* Returns the node's workflow implementation.
* @return the workflow step
*/
public Workflow workflow() {
return workflow;
}

/**
* Returns a {@link CompletableFuture} if this process is executing.
* Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value).
*
* @return A future indicating the processing state of this node.
* Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically.
*/
public CompletableFuture<String> getFuture() {
public CompletableFuture<?> getFuture() {
return future;
}

Expand Down Expand Up @@ -77,11 +92,11 @@ void setPredecessors(Set<ProcessNode> predecessors) {
*
* @return this node's future. This is returned immediately, while process execution continues asynchronously.
*/
public CompletableFuture<String> execute() {
public CompletableFuture<?> execute() {
this.future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
if (!predecessors.isEmpty()) {
List<CompletableFuture<String>> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList());
List<CompletableFuture<?>> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList());
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
try {
waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get();
Expand All @@ -93,20 +108,18 @@ public CompletableFuture<String> execute() {
return;
}
System.out.println(">>> Starting " + this.id);
// TODO: Here is where we would call out to workflow step API
workflowExecute(this.id);
try {
// TODO collect the future from this step and use it in our own completion
this.workflow.execute();
} catch (Exception e) {
// TODO remove the exception on workflow, instead handle exceptional completion
}
System.out.println("<<< Finished " + this.id);
future.complete(this.id);
future.complete(null);
});
return this.future;
}

private void workflowExecute(String s) {
try {
Thread.sleep(s.contains("ingest") ? 8000 : 4000);
} catch (InterruptedException e) {}
}

@Override
public int hashCode() {
return Objects.hash(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.opensearch.flowframework.workflow.Workflow;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -37,9 +38,10 @@ private TemplateParser() {}
/**
* Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes.
* @param json A string containing a JSON representation of nodes and edges
* @param workflowSteps A map linking JSON node names to Java objects implementing {@link Workflow}
* @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list.
*/
public static List<ProcessNode> parseJsonGraphToSequence(String json) {
public static List<ProcessNode> parseJsonGraphToSequence(String json, Map<String, Workflow> workflowSteps) {
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(json, JsonObject.class);

Expand All @@ -51,7 +53,7 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json) {
for (JsonElement nodeJson : graph.getAsJsonArray("nodes")) {
JsonObject nodeObject = nodeJson.getAsJsonObject();
String nodeId = nodeObject.get("id").getAsString();
nodes.add(new ProcessNode(nodeId));
nodes.add(new ProcessNode(nodeId, workflowSteps.get(nodeId)));
}

for (JsonElement edgeJson : graph.getAsJsonArray("edges")) {
Expand All @@ -68,7 +70,7 @@ private static List<ProcessNode> topologicalSort(List<ProcessNode> nodes, List<P
// Define the graph
Set<ProcessSequenceEdge> graph = new HashSet<>(edges);
// Map node id string to object
Map<String, ProcessNode> nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::getId, Function.identity()));
Map<String, ProcessNode> nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::id, Function.identity()));
// Build predecessor and successor maps
Map<ProcessNode, Set<ProcessSequenceEdge>> predecessorEdges = new HashMap<>();
Map<ProcessNode, Set<ProcessSequenceEdge>> successorEdges = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.Collection;
import java.util.Collections;

import static org.hamcrest.Matchers.containsString;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class FlowFrameworkPluginIT extends OpenSearchIntegTestCase {
Expand All @@ -38,6 +36,6 @@ public void testPluginInstalled() throws IOException, ParseException {
String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);

logger.info("response body: {}", body);
assertThat(body, containsString("flowframework"));
assertTrue(body.contains("flowframework"));
}
}

0 comments on commit 73a7978

Please sign in to comment.