Skip to content

Commit

Permalink
Merge pull request #221 from jglick/FlowExecutionList-JENKINS-67164
Browse files Browse the repository at this point in the history
[JENKINS-67164] Call `StepExecution.onResume` in response to `WorkflowRun.onLoad` not `FlowExecutionList.ItemListenerImpl`
  • Loading branch information
jglick authored Jun 3, 2022
2 parents 729a0d4 + e10ac66 commit a1e4906
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ public Iterable<BlockStartNode> iterateEnclosingBlocks(@NonNull FlowNode node) {
protected void notifyShutdown() {
// Default is no-op
}

/**
* Called after a restart and any attempts at {@link StepExecution#onResume} have completed.
* This is a signal that it is safe to resume program execution.
* By default, does nothing.
*/
protected void afterStepExecutionsResumed() {}

}
175 changes: 150 additions & 25 deletions src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import hudson.Extension;
import hudson.ExtensionList;
import hudson.XmlFile;
import hudson.init.InitMilestone;
import hudson.init.Terminator;
import hudson.model.Computer;
import hudson.model.listeners.ItemListener;
import hudson.remoting.SingleLaneExecutorService;
import hudson.util.CopyOnWriteList;
Expand All @@ -22,15 +24,23 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;
import org.kohsuke.accmod.restrictions.DoNotUse;

/**
Expand All @@ -44,6 +54,8 @@ public class FlowExecutionList implements Iterable<FlowExecution> {
private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get());
private XmlFile configFile;

private transient volatile boolean resumptionComplete;

public FlowExecutionList() {
load();
}
Expand Down Expand Up @@ -160,11 +172,17 @@ public static FlowExecutionList get() {
}

/**
* @deprecated Only exists for binary compatibility.
* Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded.
*
* <p>This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup.
*
* <p>Useful to avoid resuming Pipelines in contexts that may lead to deadlock.
*
* <p>It is <em>not</em> guaranteed that {@link FlowExecution#afterStepExecutionsResumed} has been called at this point.
*/
@Deprecated
@Restricted(Beta.class)
public boolean isResumptionComplete() {
return false;
return resumptionComplete;
}

/**
Expand All @@ -179,29 +197,8 @@ public void onLoaded() {
for (final FlowExecution e : list) {
// The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline.
LOGGER.log(Level.FINE, "Eagerly loaded {0}", e);
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}
}, MoreExecutors.directExecutor());
}
list.resumptionComplete = true;
}
}

Expand Down Expand Up @@ -256,4 +253,132 @@ public void onFailure(@NonNull Throwable t) {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}

/**
* Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}.
*
* <p>Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes
* regardless of its presence in {@link FlowExecutionList}.
*/
@Extension
public static class ResumeStepExecutionListener extends FlowExecutionListener {
@Override
public void onResumed(@NonNull FlowExecution e) {
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
if (e.isComplete()) {
// WorkflowRun.onLoad will not fireResumed if the execution was already complete when loaded,
// and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive.
return;
}
FlowExecutionList list = FlowExecutionList.get();
FlowExecutionOwner owner = e.getOwner();
if (!list.runningTasks.contains(owner)) {
LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()});
list.register(owner);
}
LOGGER.log(Level.FINE, "Will resume {0}", result);
new ParallelResumer(result, e::afterStepExecutionsResumed).run();
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
e.afterStepExecutionsResumed();
}

}, MoreExecutors.directExecutor());
}
}

/** Calls {@link StepExecution#onResume} for each step in a running build.
* Does so in parallel, but always completing enclosing blocks before the enclosed step.
* A simplified version of https://stackoverflow.com/a/67449067/12916, since this should be a tree not a general DAG.
*/
private static final class ParallelResumer {

private final Runnable onCompletion;
/** Step nodes mapped to the step execution. Entries removed when they are ready to be resumed. */
private final Map<FlowNode, StepExecution> nodes = new HashMap<>();
/** Step nodes currently being resumed. Removed after resumption completes. */
private final Set<FlowNode> processing = new HashSet<>();
/** Step nodes mapped to the nearest enclosing step node (no entry if at root). */
private final Map<FlowNode, FlowNode> enclosing = new HashMap<>();

ParallelResumer(Collection<StepExecution> executions, Runnable onCompletion) {
this.onCompletion = onCompletion;
// First look up positions in the flow graph, so that we can compute dependencies:
for (StepExecution se : executions) {
try {
FlowNode n = se.getContext().get(FlowNode.class);
if (n != null) {
nodes.put(n, se);
} else {
LOGGER.warning(() -> "Could not find FlowNode for " + se + " so it will not be resumed");
}
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.WARNING, "Could not look up FlowNode for " + se + " so it will not be resumed", x);
}
}
for (FlowNode n : nodes.keySet()) {
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(n);
for (FlowNode parent : scanner) {
if (parent != n && nodes.containsKey(parent)) {
enclosing.put(n, parent);
break;
}
}
}
}

synchronized void run() {
LOGGER.fine(() -> "Checking status with nodes=" + nodes + " enclosing=" + enclosing + " processing=" + processing);
if (nodes.isEmpty()) {
if (processing.isEmpty()) {
LOGGER.fine("Done");
onCompletion.run();
}
return;
}
Map<FlowNode, StepExecution> ready = new HashMap<>();
for (Map.Entry<FlowNode, StepExecution> entry : nodes.entrySet()) {
FlowNode n = entry.getKey();
FlowNode parent = enclosing.get(n);
if (parent == null || !nodes.containsKey(parent)) {
ready.put(n, entry.getValue());
}
}
LOGGER.fine(() -> "Ready to resume: " + ready);
nodes.keySet().removeAll(ready.keySet());
for (Map.Entry<FlowNode, StepExecution> entry : ready.entrySet()) {
FlowNode n = entry.getKey();
StepExecution exec = entry.getValue();
processing.add(n);
// Strictly speaking threadPoolForRemoting should be used for agent communications.
// In practice the only onResume impl known to block is in ExecutorStepExecution.
// Avoid jenkins.util.Timer since it is capped at 10 threads and should not be used for long tasks.
Computer.threadPoolForRemoting.submit(() -> {
LOGGER.fine(() -> "About to resume " + n + " ~ " + exec);
try {
exec.onResume();
} catch (Throwable x) {
exec.getContext().onFailure(x);
}
LOGGER.fine(() -> "Finished resuming " + n + " ~ " + exec);
synchronized (ParallelResumer.this) {
processing.remove(n);
run();
}
});
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void onCompleted(@NonNull FlowExecution execution) {
* Fires the {@link #onCreated(FlowExecution)} event.
*/
public static void fireCreated(@NonNull FlowExecution execution) {
// TODO Jenkins 2.325+ use Listeners.notify
for (FlowExecutionListener listener : ExtensionList.lookup(FlowExecutionListener.class)) {
listener.onCreated(execution);
}
Expand Down
Loading

0 comments on commit a1e4906

Please sign in to comment.