Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix process node zero timeout and handle not found exceptions #412

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
Expand Down Expand Up @@ -53,14 +54,13 @@
*/
public class DeprovisionWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, WorkflowResponse> {

private static final String DEPROVISION_SUFFIX = "_deprovision";

private final Logger logger = LogManager.getLogger(DeprovisionWorkflowTransportAction.class);

private final ThreadPool threadPool;
private final Client client;
private final WorkflowStepFactory workflowStepFactory;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new ProvisionWorkflowTransportAction
Expand All @@ -70,6 +70,7 @@
* @param client The node client to retrieve a stored use case template
* @param workflowStepFactory The factory instantiating workflow steps
* @param flowFrameworkIndicesHandler Class to handle all internal system indices actions
* @param flowFrameworkSettings The plugin settings
*/
@Inject
public DeprovisionWorkflowTransportAction(
Expand All @@ -78,13 +79,15 @@
ThreadPool threadPool,
Client client,
WorkflowStepFactory workflowStepFactory,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings
) {
super(DeprovisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.workflowStepFactory = workflowStepFactory;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
Expand Down Expand Up @@ -128,8 +131,8 @@
if (deprovisionStep == null) {
continue;
}
// New ID is old ID with deprovision added
String deprovisionStepId = workflowStepId + DEPROVISION_SUFFIX;
// New ID is old ID with (deprovision step type) prepended
String deprovisionStepId = "(deprovision_" + stepName + ") " + workflowStepId;
deprovisionProcessSequence.add(
new ProcessNode(
deprovisionStepId,
Expand All @@ -138,7 +141,7 @@
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Collections.emptyList(),
this.threadPool,
TimeValue.ZERO
flowFrameworkSettings.getRequestTimeout()
)
);
}
Expand All @@ -164,12 +167,20 @@
// Pause briefly before next step
Thread.sleep(100);
} catch (Throwable t) {
logger.info(
"Failed {} for {}: {}",
deprovisionNode.id(),
resourceNameAndId,
t.getCause() == null ? t.getMessage() : t.getCause().getMessage()
);
// If any deprovision fails due to not found, it's a success
if (t.getCause() instanceof OpenSearchStatusException
&& ((OpenSearchStatusException) t.getCause()).status() == RestStatus.NOT_FOUND) {
logger.info("Successful (not found) {} for {}", deprovisionNode.id(), resourceNameAndId);

Check warning on line 173 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L173

Added line #L173 was not covered by tests
// Remove from list so we don't try again
iter.remove();

Check warning on line 175 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L175

Added line #L175 was not covered by tests
} else {
logger.info(
"Failed {} for {}: {}",
deprovisionNode.id(),
resourceNameAndId,
t.getCause() == null ? t.getMessage() : t.getCause().getMessage()
);
}
}
}
if (deprovisionProcessSequence.size() < resourceCount) {
Expand Down Expand Up @@ -257,17 +268,10 @@
}

private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List<ResourceCreated> resourcesCreated) {
String deprovisionId = deprovisionNode.id();
int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX);
ResourceCreated resource = null;
if (pos > 0) {
for (ResourceCreated resourceCreated : resourcesCreated) {
if (resourceCreated.workflowStepId().equals(deprovisionId.substring(0, pos))) {
resource = resourceCreated;
}
}
}
return resource;
return resourcesCreated.stream()
.filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId()))
.findFirst()
.orElse(null);
}

private static String getResourceNameAndId(ResourceCreated resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.WorkflowState;
Expand Down Expand Up @@ -70,21 +71,25 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase
private DeleteConnectorStep deleteConnectorStep;
private DeprovisionWorkflowTransportAction deprovisionWorkflowTransportAction;
private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private FlowFrameworkSettings flowFrameworkSettings;

@Override
public void setUp() throws Exception {
super.setUp();
this.client = mock(Client.class);
this.workflowStepFactory = mock(WorkflowStepFactory.class);
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
flowFrameworkSettings = mock(FlowFrameworkSettings.class);
when(flowFrameworkSettings.getRequestTimeout()).thenReturn(TimeValue.timeValueSeconds(10));

this.deprovisionWorkflowTransportAction = new DeprovisionWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
threadPool,
client,
workflowStepFactory,
flowFrameworkIndicesHandler
flowFrameworkIndicesHandler,
flowFrameworkSettings
);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
Expand Down
Loading