Skip to content

Commit

Permalink
Add javadocs
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 11, 2023
1 parent 5216478 commit 9e08097
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 72 deletions.
89 changes: 89 additions & 0 deletions src/main/java/org/opensearch/flowframework/template/Demo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
*/
public class Demo {

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
*/
public static void main(String[] args) {
String json = "{\n"
+ " \"sequence\": {\n"
+ " \"nodes\": [\n"
+ " {\n"
+ " \"id\": \"fetch_model\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_ingest_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_search_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_neural_search_index\"\n"
+ " }\n"
+ " ],\n"
+ " \"edges\": [\n"
+ " {\n"
+ " \"source\": \"fetch_model\",\n"
+ " \"dest\": \"create_ingest_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"fetch_model\",\n"
+ " \"dest\": \"create_search_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"create_ingest_pipeline\",\n"
+ " \"dest\": \"create_neural_search_index\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"create_search_pipeline\",\n"
+ " \"dest\": \"create_neural_search_index\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ "}";

System.out.println(json);

System.out.println("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json);
List<CompletableFuture<String>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
System.out.format(
"Queueing process [%s]. %s.%n",
n.getId(),
predecessors.isEmpty()
? "Can start immediately!"
: String.format(
"Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", "))
)
);
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
System.out.println("All done!");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,66 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution.
*/
public class ProcessNode {
private final String id;
private CompletableFuture<String> future;
private CompletableFuture<String> future = null;

// will be populated during graph parsing
private Set<ProcessNode> predecessors = Collections.emptySet();

/**
* Create this node with a unique id.
*/
ProcessNode(String id) {
this.id = id;
}

/**
* Returns the node's id.
* @return the node id.
*/
public String getId() {
return id;
}

/**
* Returns a {@link CompletableFuture} if this process is executing.
* Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value).
*
* @return A future indicating the processing state of this node.
* Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically.
*/
public CompletableFuture<String> getFuture() {
return future;
}

/**
* Returns the predecessors of this node in the workflow.
* The predecessor's {@link #getFuture()} must complete before execution begins on this node.
*
* @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node.
*/
public Set<ProcessNode> getPredecessors() {
return predecessors;
}

public void setPredecessors(Set<ProcessNode> predecessors) {
/**
* Sets the predecessor node. Called by {@link TemplateParser}.
*
* @param predecessors The predecessors of this node.
*/
void setPredecessors(Set<ProcessNode> predecessors) {
this.predecessors = Set.copyOf(predecessors);
}

/**
* Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes.
*
* @return this node's future. This is returned immediately, while process execution continues asynchronously.
*/
public CompletableFuture<String> execute() {
this.future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
Expand All @@ -60,16 +93,17 @@ public CompletableFuture<String> execute() {
return;
}
System.out.println(">>> Starting " + this.id);
sleep(id.contains("ingest") ? 8000 : 4000);
// TODO: Here is where we would call out to workflow step API
workflowExecute(this.id);
System.out.println("<<< Finished " + this.id);
future.complete(this.id);
});
return this.future;
}

private void sleep(long i) {
private void workflowExecute(String s) {
try {
Thread.sleep(i);
Thread.sleep(s.contains("ingest") ? 8000 : 4000);
} catch (InterruptedException e) {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,38 @@

import java.util.Objects;

/**
* Representation of an edge between process nodes in a workflow graph.
*/
public class ProcessSequenceEdge {
private final String source;
private final String destination;

/**
* Create this edge with the id's of the source and destination nodes.
*
* @param source The source node id.
* @param destination The destination node id.
*/
ProcessSequenceEdge(String source, String destination) {
this.source = source;
this.destination = destination;
}

/**
* Gets the source node id.
*
* @return the source node id.
*/
public String getSource() {
return source;
}

/**
* Gets the destination node id.
*
* @return the destination node id.
*/
public String getDestination() {
return destination;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,79 +21,25 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Utility class for parsing templates.
*/
public class TemplateParser {

public static void main(String[] args) {
String json = "{\n"
+ " \"sequence\": {\n"
+ " \"nodes\": [\n"
+ " {\n"
+ " \"id\": \"fetch_model\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_ingest_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_search_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"id\": \"create_neural_search_index\"\n"
+ " }\n"
+ " ],\n"
+ " \"edges\": [\n"
+ " {\n"
+ " \"source\": \"fetch_model\",\n"
+ " \"dest\": \"create_ingest_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"fetch_model\",\n"
+ " \"dest\": \"create_search_pipeline\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"create_ingest_pipeline\",\n"
+ " \"dest\": \"create_neural_search_index\"\n"
+ " },\n"
+ " {\n"
+ " \"source\": \"create_search_pipeline\",\n"
+ " \"dest\": \"create_neural_search_index\"\n"
// + " }\n,"
// + " {\n"
// + " \"source\": \"create_neural_search_index\",\n"
// + " \"dest\": \"fetch_model\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ "}";

System.out.println(json);

System.out.println("Parsing graph to sequence...");
List<ProcessNode> processSequence = parseJsonGraphToSequence(json);
List<CompletableFuture<String>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
System.out.format(
"Queueing process [%s]. %s.%n",
n.getId(),
predecessors.isEmpty()
? "Can start immediately!"
: String.format(
"Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", "))
)
);
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
System.out.println("All done!");
}
/**
* Prevent instantiating this class.
*/
private TemplateParser() {}

private static List<ProcessNode> parseJsonGraphToSequence(String json) {
/**
* Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes.
* @param json A string containing a JSON representation of nodes and edges
* @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list.
*/
public static List<ProcessNode> parseJsonGraphToSequence(String json) {
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(json, JsonObject.class);

Expand Down

0 comments on commit 9e08097

Please sign in to comment.