From 9562827ed757c31dd345c2e7df466e7bf7a9a8ea Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 16 Jan 2024 18:21:19 -0800 Subject: [PATCH] Fix process node zero timeout and handle not found exceptions Signed-off-by: Daniel Widdis --- .../DeprovisionWorkflowTransportAction.java | 52 ++++++++++--------- ...provisionWorkflowTransportActionTests.java | 7 ++- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 068ac7976..b2a5e5028 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -11,14 +11,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.model.ProvisioningProgress; @@ -53,14 +54,13 @@ */ public class DeprovisionWorkflowTransportAction extends HandledTransportAction { - private static final String DEPROVISION_SUFFIX = "_deprovision"; - private final Logger logger = LogManager.getLogger(DeprovisionWorkflowTransportAction.class); private final ThreadPool threadPool; private final Client client; private final WorkflowStepFactory workflowStepFactory; private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private final FlowFrameworkSettings flowFrameworkSettings; /** * Instantiates a new ProvisionWorkflowTransportAction @@ -70,6 +70,7 @@ public class DeprovisionWorkflowTransportAction extends HandledTransportAction resourcesCreated) { - String deprovisionId = deprovisionNode.id(); - int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX); - ResourceCreated resource = null; - if (pos > 0) { - for (ResourceCreated resourceCreated : resourcesCreated) { - if (resourceCreated.workflowStepId().equals(deprovisionId.substring(0, pos))) { - resource = resourceCreated; - } - } - } - return resource; + return resourcesCreated.stream() + .filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId())) + .findFirst() + .orElse(null); } private static String getResourceNameAndId(ResourceCreated resource) { diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index fa83264ca..6259394f2 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -15,6 +15,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.model.ResourceCreated; import org.opensearch.flowframework.model.WorkflowState; @@ -70,6 +71,7 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase private DeleteConnectorStep deleteConnectorStep; private DeprovisionWorkflowTransportAction deprovisionWorkflowTransportAction; private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private FlowFrameworkSettings flowFrameworkSettings; @Override public void setUp() throws Exception { @@ -77,6 +79,8 @@ public void setUp() throws Exception { this.client = mock(Client.class); this.workflowStepFactory = mock(WorkflowStepFactory.class); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + flowFrameworkSettings = mock(FlowFrameworkSettings.class); + when(flowFrameworkSettings.getRequestTimeout()).thenReturn(TimeValue.timeValueSeconds(10)); this.deprovisionWorkflowTransportAction = new DeprovisionWorkflowTransportAction( mock(TransportService.class), @@ -84,7 +88,8 @@ public void setUp() throws Exception { threadPool, client, workflowStepFactory, - flowFrameworkIndicesHandler + flowFrameworkIndicesHandler, + flowFrameworkSettings ); MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);