Skip to content

Commit

Permalink
Introducing getExecutorBuilders extension point to FlowFramworkPlugin…
Browse files Browse the repository at this point in the history
…, added FixedExecutorBuilder thread pool for provisioning tasks, set up async workflow execution, added TODOs for state/GC index handling

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Oct 4, 2023
1 parent 0bceff6 commit cc32079
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand All @@ -49,14 +51,23 @@
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

// TODO : Move names to common values class
/**
* The base URI for this plugin's rest actions
*/
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_ai_flow";
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/**
* The URI for this plugin's workflow rest actions
*/
public static final String WORKFLOWS_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflows";
/**
* Flow Framework plugin thread pool name prefix
*/
public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework.";
/**
* The provision workflow thread pool name
*/
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/**
* Instantiate this plugin.
Expand Down Expand Up @@ -104,4 +115,18 @@ public List<RestHandler> getRestHandlers(
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// TODO : Determine final size/queueSize values for the provision thread pool
FixedExecutorBuilder provisionThreadPool = new FixedExecutorBuilder(

Check warning on line 121 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L121

Added line #L121 was not covered by tests
settings,
PROVISION_THREAD_POOL,
1,
10,
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL,
false
);
return ImmutableList.of(provisionThreadPool);

Check warning on line 129 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L129

Added line #L129 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,60 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.FlowFrameworkPlugin.PROVISION_THREAD_POOL;

/**
* Transport Action to provision a workflow from a stored use case template
*/
public class ProvisionWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, WorkflowResponse> {

private final Logger logger = LogManager.getLogger(ProvisionWorkflowTransportAction.class);

Check warning on line 41 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L41

Added line #L41 was not covered by tests

// TODO : Move to common values class, pending implementation
/**
* The name of the provision workflow within the use case template
*/
private static final String PROVISION_WORKFLOW = "provision";

private final ThreadPool threadPool;
private final Client client;
private final WorkflowProcessSorter workflowProcessSorter;

/**
* Instantiates a new ProvisionWorkflowTransportAction
* @param transportService The TransportService
* @param actionFilters action filters
* @param threadPool The OpenSearch thread pool
* @param client The node client to retrieve a stored use case template
* @param workflowProcessSorter Utility class to generate a togologically sorted list of Process nodes
*/
@Inject
public ProvisionWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) {
public ProvisionWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Client client,
WorkflowProcessSorter workflowProcessSorter
) {
super(ProvisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.workflowProcessSorter = workflowProcessSorter;
}

Check warning on line 73 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L69-L73

Added lines #L69 - L73 were not covered by tests

@Override
Expand All @@ -47,22 +81,86 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
String workflowId = workflowResponse.getWorkflowId();
Template template = request.getTemplate();

Check warning on line 82 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L80-L82

Added lines #L80 - L82 were not covered by tests

// TODO : Use node client to update state index, given workflowId
// TODO : Pass workflowId and template to execution
// TODO : Use node client to update state index to PROVISIONING, given workflowId

listener.onResponse(new WorkflowResponse(workflowId));

Check warning on line 86 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L86

Added line #L86 was not covered by tests

}, exception -> { listener.onFailure(exception); }));
// Asychronously begin provision workflow excecution
executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW));

Check warning on line 89 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L89

Added line #L89 was not covered by tests

}, exception -> { listener.onFailure(exception); }));

Check warning on line 91 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L91

Added line #L91 was not covered by tests
} else {

// Use case template has been previously saved, retrieve entry and execute
String workflowId = request.getWorkflowId();

Check warning on line 94 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L94

Added line #L94 was not covered by tests

// TODO : use node client to update state index, given workflowId
// TODO : Retrieve template from global context index using node client execute
// TODO : Retrieve template from global context index using node client
Template template = null; // temporary, remove later

Check warning on line 97 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L97

Added line #L97 was not covered by tests

// TODO : use node client to update state index entry to PROVISIONING, given workflowId

listener.onResponse(new WorkflowResponse(workflowId));
executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW));

Check warning on line 102 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L101-L102

Added lines #L101 - L102 were not covered by tests
}
}

Check warning on line 104 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L104

Added line #L104 was not covered by tests

/**
* Retrieves a thread from the provision thread pool to execute a workflow
* @param workflowId The id of the workflow
* @param workflow The workflow to execute
*/
private void executeWorkflowAsync(String workflowId, Workflow workflow) {
// TODO : Update Action listener type to State index Request
ActionListener<String> provisionWorkflowListener = ActionListener.wrap(response -> {
logger.info("Provisioning completed successuflly for workflow {}", workflowId);

Check warning on line 114 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L113-L114

Added lines #L113 - L114 were not covered by tests

// TODO : Create State index request to update STATE entry status to READY
}, exception -> {
logger.error("Provisioning failed for workflow {} : {}", workflowId, exception);

Check warning on line 118 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L117-L118

Added lines #L117 - L118 were not covered by tests

// TODO : Create State index request to update STATE entry status to FAILED
});

Check warning on line 121 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L121

Added line #L121 was not covered by tests
try {
threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflow, provisionWorkflowListener); });
} catch (Exception exception) {
provisionWorkflowListener.onFailure(exception);
}
}

Check warning on line 127 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L123-L127

Added lines #L123 - L127 were not covered by tests

/**
* Topologically sorts a given workflow into a sequence of ProcessNodes and executes the workflow
* @param workflow The workflow to execute
* @param workflowListener The listener that updates the status of a workflow execution
*/
private void executeWorkflow(Workflow workflow, ActionListener<String> workflowListener) {

List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

Check warning on line 137 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L136-L137

Added lines #L136 - L137 were not covered by tests

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Check warning on line 140 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L140

Added line #L140 was not covered by tests

logger.info(

Check warning on line 142 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L142

Added line #L142 was not covered by tests
"Queueing process [{}].{}",
processNode.id(),

Check warning on line 144 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L144

Added line #L144 was not covered by tests
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),

Check warning on line 148 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L146-L148

Added lines #L146 - L148 were not covered by tests
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))

Check warning on line 150 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L150

Added line #L150 was not covered by tests
)
);

workflowFutureList.add(processNode.execute());
}

Check warning on line 155 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L154-L155

Added lines #L154 - L155 were not covered by tests
try {
// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

Check warning on line 158 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L158

Added line #L158 was not covered by tests

// TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now
workflowListener.onResponse("READY");
} catch (CancellationException | CompletionException ex) {
workflowListener.onFailure(ex);
}
}

Check warning on line 165 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L161-L165

Added lines #L161 - L165 were not covered by tests

Expand Down

0 comments on commit cc32079

Please sign in to comment.