Skip to content

Commit

Permalink
Merge pull request #347 from dwnusbaum/infinite-loop-memory-leak
Browse files Browse the repository at this point in the history
Prevent `StepExecutionIterator` from leaking memory in cases where a single processed execution has a stuck CPS VM thread
  • Loading branch information
jglick authored Aug 9, 2024
2 parents e986fb0 + b15fde4 commit c211223
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 26 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>scm-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-job</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,24 +253,25 @@ public ListenableFuture<?> apply(final Function<StepExecution, Void> f) {

for (FlowExecution e : FlowExecutionList.get()) {
ListenableFuture<List<StepExecution>> execs = e.getCurrentExecutions(false);
all.add(execs);
Futures.addCallback(execs,new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
for (StepExecution e : result) {
try {
f.apply(e);
} catch (RuntimeException x) {
LOGGER.log(Level.WARNING, null, x);
}
// It is important that the combined future's return values do not reference the individual step
// executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references
// to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution
// has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing.
ListenableFuture<Void> results = Futures.transform(execs, (List<StepExecution> result) -> {
for (StepExecution se : result) {
try {
f.apply(se);
} catch (RuntimeException x) {
LOGGER.log(Level.WARNING, null, x);
}
}

@Override
public void onFailure(@NonNull Throwable t) {
LOGGER.log(Level.WARNING, null, t);
}
return null;
}, MoreExecutors.directExecutor());
ListenableFuture<Void> resultsWithWarningsLogged = Futures.catching(results, Throwable.class, t -> {
LOGGER.log(Level.WARNING, null, t);
return null;

Check warning on line 272 in src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered lines

Lines 271-272 are not covered by tests
}, MoreExecutors.directExecutor());
all.add(resultsWithWarningsLogged);
}

return Futures.allAsList(all);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
Expand All @@ -38,20 +39,22 @@
import hudson.model.TaskListener;
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import jenkins.model.Jenkins;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.steps.StepExecutions;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -60,6 +63,7 @@
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsSessionRule;
import org.jvnet.hudson.test.MemoryAssert;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

Expand Down Expand Up @@ -132,7 +136,7 @@ public class FlowExecutionListTest {
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
Expand Down Expand Up @@ -160,6 +164,34 @@ public class FlowExecutionListTest {
});
}

@Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable {
sessions.then(r -> {
var notStuck = r.createProject(WorkflowJob.class, "not-stuck");
notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true));
var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", notStuckBuild);
WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild);
// Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck.
var stuck = r.createProject(WorkflowJob.class, "stuck");
stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false));
var stuckBuild = stuck.scheduleBuild2(0).waitForStart();
await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck"));
// Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService
// for stuck #1 that will never complete, so the resulting future will never complete.
StepExecution.applyAll(e -> null);
// Let notStuckBuild complete and clean up all references.
SemaphoreStep.success("wait/1", null);
r.waitForCompletion(notStuckBuild);
notStuckBuild = null; // Clear out the local variable in this thread.
Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared.
// Make sure that the reference can be GC'd.
MemoryAssert.assertGC(notStuckBuildRef, true);
// Allow stuck #1 to complete so the test can be cleaned up promptly.
SynchronousBlockingStep.unblock("stuck");
r.waitForCompletion(stuckBuild);
});
}

public static class NonResumableStep extends Step implements Serializable {
public static final long serialVersionUID = 1L;
@DataBoundConstructor
Expand Down Expand Up @@ -198,14 +230,59 @@ public String getFunctionName() {
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
* Blocks the CPS VM thread synchronously (bad!) to test related problems.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
public static class SynchronousBlockingStep extends Step implements Serializable {
private static final long serialVersionUID = 1L;
private static final Map<String, State> blocked = new HashMap<>();
private final String id;

@DataBoundConstructor
public SynchronousBlockingStep(String id) {
this.id = id;
if (blocked.put(id, State.NOT_STARTED) != null) {
throw new IllegalArgumentException("Attempting to reuse ID: " + id);
}
}

@Override
public StepExecution start(StepContext context) throws Exception {
return StepExecutions.synchronous(context, c -> {
blocked.put(id, State.BLOCKED);
c.get(TaskListener.class).getLogger().println(id + " blocked");
while (blocked.get(id) == State.BLOCKED) {
Thread.sleep(100L);
}
c.get(TaskListener.class).getLogger().println(id + " unblocked ");
return null;
});
}

public static boolean isStarted(String id) {
var state = blocked.get(id);
return state != null && state != State.NOT_STARTED;
}

public static void unblock(String id) {
blocked.put(id, State.UNBLOCKED);
}

private enum State {
NOT_STARTED,
BLOCKED,
UNBLOCKED,
}

@TestExtension("stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck") public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "blockSynchronously";
}
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}

0 comments on commit c211223

Please sign in to comment.