From 9e08097d333e28c5c894c57bd9e02f2e821cd25a Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 8 Sep 2023 12:37:09 -0700 Subject: [PATCH] Add javadocs Signed-off-by: Daniel Widdis --- .../flowframework/template/Demo.java | 89 +++++++++++++++++++ .../flowframework/template/ProcessNode.java | 44 +++++++-- .../template/ProcessSequenceEdge.java | 19 ++++ .../template/TemplateParser.java | 80 +++-------------- 4 files changed, 160 insertions(+), 72 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/template/Demo.java diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java new file mode 100644 index 000000000..8c9afadd9 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Demo class exercising {@link TemplateParser}. This will be moved to a unit test. + */ +public class Demo { + + /** + * Demonstrate parsing a JSON graph. + * + * @param args unused + */ + public static void main(String[] args) { + String json = "{\n" + + " \"sequence\": {\n" + + " \"nodes\": [\n" + + " {\n" + + " \"id\": \"fetch_model\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_neural_search_index\"\n" + + " }\n" + + " ],\n" + + " \"edges\": [\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_ingest_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_search_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + + System.out.println(json); + + System.out.println("Parsing graph to sequence..."); + List processSequence = TemplateParser.parseJsonGraphToSequence(json); + List> futureList = new ArrayList<>(); + + for (ProcessNode n : processSequence) { + Set predecessors = n.getPredecessors(); + System.out.format( + "Queueing process [%s]. %s.%n", + n.getId(), + predecessors.isEmpty() + ? "Can start immediately!" + : String.format( + "Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) + ) + ); + futureList.add(n.execute()); + } + futureList.forEach(CompletableFuture::join); + System.out.println("All done!"); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index e1b57fc51..88c0bfb74 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -17,33 +17,66 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +/** + * Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution. + */ public class ProcessNode { private final String id; - private CompletableFuture future; + private CompletableFuture future = null; // will be populated during graph parsing private Set predecessors = Collections.emptySet(); + /** + * Create this node with a unique id. + */ ProcessNode(String id) { this.id = id; } + /** + * Returns the node's id. + * @return the node id. + */ public String getId() { return id; } + /** + * Returns a {@link CompletableFuture} if this process is executing. + * Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value). + * + * @return A future indicating the processing state of this node. + * Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically. + */ public CompletableFuture getFuture() { return future; } + /** + * Returns the predecessors of this node in the workflow. + * The predecessor's {@link #getFuture()} must complete before execution begins on this node. + * + * @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node. + */ public Set getPredecessors() { return predecessors; } - public void setPredecessors(Set predecessors) { + /** + * Sets the predecessor node. Called by {@link TemplateParser}. + * + * @param predecessors The predecessors of this node. + */ + void setPredecessors(Set predecessors) { this.predecessors = Set.copyOf(predecessors); } + /** + * Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes. + * + * @return this node's future. This is returned immediately, while process execution continues asynchronously. + */ public CompletableFuture execute() { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { @@ -60,16 +93,17 @@ public CompletableFuture execute() { return; } System.out.println(">>> Starting " + this.id); - sleep(id.contains("ingest") ? 8000 : 4000); + // TODO: Here is where we would call out to workflow step API + workflowExecute(this.id); System.out.println("<<< Finished " + this.id); future.complete(this.id); }); return this.future; } - private void sleep(long i) { + private void workflowExecute(String s) { try { - Thread.sleep(i); + Thread.sleep(s.contains("ingest") ? 8000 : 4000); } catch (InterruptedException e) {} } diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java index 4d768958f..9544620fb 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java @@ -10,19 +10,38 @@ import java.util.Objects; +/** + * Representation of an edge between process nodes in a workflow graph. + */ public class ProcessSequenceEdge { private final String source; private final String destination; + /** + * Create this edge with the id's of the source and destination nodes. + * + * @param source The source node id. + * @param destination The destination node id. + */ ProcessSequenceEdge(String source, String destination) { this.source = source; this.destination = destination; } + /** + * Gets the source node id. + * + * @return the source node id. + */ public String getSource() { return source; } + /** + * Gets the destination node id. + * + * @return the destination node id. + */ public String getDestination() { return destination; } diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 9d125d118..0448fafd3 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -21,79 +21,25 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; +/** + * Utility class for parsing templates. + */ public class TemplateParser { - public static void main(String[] args) { - String json = "{\n" - + " \"sequence\": {\n" - + " \"nodes\": [\n" - + " {\n" - + " \"id\": \"fetch_model\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_neural_search_index\"\n" - + " }\n" - + " ],\n" - + " \"edges\": [\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_ingest_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_search_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - // + " }\n," - // + " {\n" - // + " \"source\": \"create_neural_search_index\",\n" - // + " \"dest\": \"fetch_model\"\n" - + " }\n" - + " ]\n" - + " }\n" - + "}"; - - System.out.println(json); - - System.out.println("Parsing graph to sequence..."); - List processSequence = parseJsonGraphToSequence(json); - List> futureList = new ArrayList<>(); - - for (ProcessNode n : processSequence) { - Set predecessors = n.getPredecessors(); - System.out.format( - "Queueing process [%s]. %s.%n", - n.getId(), - predecessors.isEmpty() - ? "Can start immediately!" - : String.format( - "Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) - ) - ); - futureList.add(n.execute()); - } - futureList.forEach(CompletableFuture::join); - System.out.println("All done!"); - } + /** + * Prevent instantiating this class. + */ + private TemplateParser() {} - private static List parseJsonGraphToSequence(String json) { + /** + * Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes. + * @param json A string containing a JSON representation of nodes and edges + * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. + */ + public static List parseJsonGraphToSequence(String json) { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(json, JsonObject.class);