Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topological Sorting and Sequenced Execution #26

Merged
merged 12 commits into from
Sep 19, 2023
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
75 changes: 75 additions & 0 deletions src/main/java/demo/CreateIndexWorkflowStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class CreateIndexWorkflowStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class);

private final String name;

public CreateIndexWorkflowStep() {
this.name = "CREATE_INDEX";
}

@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
String inputIndex = null;
boolean first = true;
for (WorkflowData wfData : data) {
logger.debug(
"{} sent params: {}, content: {}",
first ? "Initialization" : "Previous step",
wfData.getParams(),
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
wfData.getContent()
);
if (first) {
Map<String, String> params = data.get(0).getParams();
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
if (params.containsKey("index")) {
inputIndex = params.get("index");
}
first = false;
}
}
// do some work, simulating a REST API call
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
// Simulate response of created index
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex);
// OLD UNSCALABLE WAY: future.complete(new CreateIndexResponseData(response));
// Better way with an anonymous class:
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index", response.index());
}
});
});

return future;
}

@Override
public String getName() {
return name;
}
}
80 changes: 80 additions & 0 deletions src/main/java/demo/DataDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
*/
public class DataDemo {

private static final Logger logger = LogManager.getLogger(DataDemo.class);

// This is temporary. We need a factory class to generate these workflow steps
// based on a field in the JSON.
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
static {
workflowMap.put("create_index", new CreateIndexWorkflowStep());
workflowMap.put("create_another_index", new CreateIndexWorkflowStep());
}

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
*/
public static void main(String[] args) {
String path = "src/test/resources/template/datademo.json";
String json;
try {
json = new String(Files.readAllBytes(Paths.get(path)));
} catch (IOException e) {
logger.error("Failed to read JSON at path {}", path);
return;
}

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
}

}
83 changes: 83 additions & 0 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
*/
public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

// This is temporary. We need a factory class to generate these workflow steps
// based on a field in the JSON.
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
static {
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
}

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
*/
public static void main(String[] args) {
String path = "src/test/resources/template/demo.json";
String json;
try {
json = new String(Files.readAllBytes(Paths.get(path)));
} catch (IOException e) {
logger.error("Failed to read JSON at path {}", path);
return;
}

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
}

}
45 changes: 45 additions & 0 deletions src/main/java/demo/DemoWorkflowStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DemoWorkflowStep implements WorkflowStep {

private final long delay;
private final String name;

public DemoWorkflowStep(long delay) {
this.delay = delay;
this.name = "DEMO_DELAY_" + delay;
}

@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(this.delay);
future.complete(null);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
return future;
}

@Override
public String getName() {
return name;
}
}
13 changes: 13 additions & 0 deletions src/main/java/demo/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

DO NOT DEPEND ON CLASSES IN THIS PACKAGE.

The contents of this folder are for demo/proof-of-concept use.

Feel free to look at the classes in this folder for potential "how could I" scenarios.

Tests will not be written against them.
Documentation may be incomplete, wrong, or outdated.
These are not for production use.
They will be deleted without notice at some point, and altered without notice at other points.

DO NOT DEPEND ON CLASSES IN THIS PACKAGE.
Loading
Loading