Skip to content

Commit

Permalink
Add more thread pools (#465)
Browse files Browse the repository at this point in the history
* Add more thread pools

Signed-off-by: Daniel Widdis <[email protected]>

* Increase maxFailures to 4

Signed-off-by: Daniel Widdis <[email protected]>

* Wait before starting tests

Signed-off-by: Daniel Widdis <[email protected]>

* Improve ProcessNode timeout

Signed-off-by: Daniel Widdis <[email protected]>

* Increase minimum thread pool requirement

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 30, 2024
1 parent 5d452f3 commit a812e51
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ allprojects {
retry {
if (System.getenv().containsKey("CI")) {
maxRetries = 1
maxFailures = 3
maxFailures = 4
failOnPassedAfterRetry = false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
Expand Down Expand Up @@ -185,9 +187,23 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(settings),
TimeValue.timeValueMinutes(5),
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ private CommonValue() {}
*/
/** Flow Framework plugin thread pool name prefix */
public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework.";
/** The provision workflow thread pool name */
/** The general workflow thread pool name for most calls */
public static final String WORKFLOW_THREAD_POOL = "opensearch_workflow";
/** The workflow thread pool name for provisioning */
public static final String PROVISION_WORKFLOW_THREAD_POOL = "opensearch_provision_workflow";
/** The workflow thread pool name for deprovisioning */
public static final String DEPROVISION_WORKFLOW_THREAD_POOL = "opensearch_deprovision_workflow";

/*
* Field names common to multiple classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

Expand Down Expand Up @@ -102,7 +102,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
context.restore();

// Retrieve resources from workflow state and deprovision
threadPool.executor(WORKFLOW_THREAD_POOL)
threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL)
.execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener));
}, exception -> {
String message = "Failed to get workflow state for workflow " + workflowId;
Expand Down Expand Up @@ -143,6 +143,7 @@ private void executeDeprovisionSequence(
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Collections.emptyList(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
flowFrameworkSettings.getRequestTimeout()
)
);
Expand Down Expand Up @@ -196,6 +197,7 @@ private void executeDeprovisionSequence(
pn.input(),
pn.predecessors(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
pn.nodeTimeout()
);
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Transport Action to provision a workflow from a stored use case template
Expand Down Expand Up @@ -180,7 +180,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
*/
private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowSequence, ActionListener<WorkflowResponse> listener) {
try {
threadPool.executor(WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); });
threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); });
} catch (Exception exception) {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.ScheduledCancellable;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Representation of a process node in a workflow graph.
Expand All @@ -37,6 +33,7 @@ public class ProcessNode {
private final WorkflowData input;
private final List<ProcessNode> predecessors;
private final ThreadPool threadPool;
private final String threadPoolName;
private final TimeValue nodeTimeout;

private final PlainActionFuture<WorkflowData> future = PlainActionFuture.newFuture();
Expand All @@ -50,6 +47,7 @@ public class ProcessNode {
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
* @param threadPool The OpenSearch thread pool
* @param threadPoolName The thread pool to use
* @param nodeTimeout The timeout value for executing on this node
*/
public ProcessNode(
Expand All @@ -59,6 +57,7 @@ public ProcessNode(
WorkflowData input,
List<ProcessNode> predecessors,
ThreadPool threadPool,
String threadPoolName,
TimeValue nodeTimeout
) {
this.id = id;
Expand All @@ -67,6 +66,7 @@ public ProcessNode(
this.input = input;
this.predecessors = predecessors;
this.threadPool = threadPool;
this.threadPoolName = threadPoolName;
this.nodeTimeout = nodeTimeout;
}

Expand Down Expand Up @@ -152,34 +152,23 @@ public PlainActionFuture<WorkflowData> execute() {
WorkflowData wd = node.future().actionGet();
inputMap.put(wd.getNodeId(), wd);
}
logger.info("Starting {}.", this.id);

ScheduledCancellable delayExec = null;
if (this.nodeTimeout.compareTo(TimeValue.ZERO) > 0) {
delayExec = threadPool.schedule(() -> {
if (!future.isDone()) {
future.onFailure(new TimeoutException("Execute timed out for " + this.id));
}
}, this.nodeTimeout, ThreadPool.Names.SAME);
}
// record start time for this step.
logger.info("Starting {}.", this.id);
PlainActionFuture<WorkflowData> stepFuture = this.workflowStep.execute(
this.id,
this.input,
inputMap,
this.previousNodeInputs
);
// If completed exceptionally, this is a no-op
future.onResponse(stepFuture.get());
future.onResponse(stepFuture.actionGet(this.nodeTimeout));
// record end time passing workflow steps
if (delayExec != null) {
delayExec.cancel();
}
logger.info("Finished {}.", this.id);
} catch (Exception e) {
this.future.onFailure(e);
}
}, threadPool.executor(WORKFLOW_THREAD_POOL));
}, threadPool.executor(this.threadPoolName));
return this.future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.model.WorkflowNode.NODE_TIMEOUT_DEFAULT_VALUE;
import static org.opensearch.flowframework.model.WorkflowNode.NODE_TIMEOUT_FIELD;
Expand Down Expand Up @@ -122,6 +123,7 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId)
data,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
);
idToNodeMap.put(processNode.id(), processNode);
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/mappings/workflow-steps.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "60s"
},
"delete_connector": {
"inputs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testPlugin() throws IOException {
);
assertEquals(9, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(9, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(3, ffp.getExecutorBuilders(settings).size());
assertEquals(5, ffp.getSettings().size());
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/test/java/org/opensearch/flowframework/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ public static Response makeRequest(
if (entity != null) {
request.setEntity(entity);
}
try {
return client.performRequest(request);
} catch (IOException e) {
// In restricted resource cluster, initialization of REST clients on other nodes takes time
// Wait 10 seconds and try again
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {}
}
return client.performRequest(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.model.WorkflowState;
import org.junit.Before;
import org.junit.ComparisonFailure;

import java.util.Collections;
Expand All @@ -30,7 +31,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
Expand All @@ -39,6 +42,18 @@

public class FlowFrameworkRestApiIT extends FlowFrameworkRestTestCase {

private static AtomicBoolean waitToStart = new AtomicBoolean(true);

@Before
public void waitToStart() throws Exception {
// ML Commons cron job runs every 10 seconds and takes 20+ seconds to initialize .plugins-ml-config index
// Delay on the first attempt for 25 seconds to allow this initialization and prevent flaky tests
if (waitToStart.getAndSet(false)) {
CountDownLatch latch = new CountDownLatch(1);
latch.await(25, TimeUnit.SECONDS);
}
}

public void testSearchWorkflows() throws Exception {

// Create a Workflow that has a credential 12345
Expand Down Expand Up @@ -257,7 +272,7 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
// wait and ensure state is completed/done
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); },
30,
120,
TimeUnit.SECONDS
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

import org.mockito.ArgumentCaptor;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -57,11 +57,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase
private static ThreadPool threadPool = new TestThreadPool(
DeprovisionWorkflowTransportActionTests.class.getName(),
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
);
private Client client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
Expand Down Expand Up @@ -85,9 +86,16 @@ public void setUp() throws Exception {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
OpenSearchExecutors.allocatedProcessors(Settings.EMPTY),
TimeValue.timeValueMinutes(5),
Math.max(1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
)
);
this.deployModel = new DeployModelStep(
Expand Down
Loading

0 comments on commit a812e51

Please sign in to comment.