Skip to content

Commit

Permalink
Hold off reloads until after events are delivered.
Browse files Browse the repository at this point in the history
  • Loading branch information
sdedic committed Aug 29, 2024
1 parent 71d6cf4 commit cdd7f17
Showing 1 changed file with 114 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class ProjectReloadInternal {
* events, state releases and other stuff that should not be processed during project reload.
*/
private final Map<Project, ProjectOperations> pendingOperations = new WeakHashMap<>();
private final Set<ProjectOperations> terminatingOperations = new HashSet<>();

/**
* Identity map for ProjectStateData. Each is assigned a special Object held in the handle
Expand Down Expand Up @@ -532,7 +533,7 @@ public Pair<StateRef, ProjectState> getProjectState0(Project p, Lookup context,
oldS.notify();
}
}
endOperation(p, null).run();
endOperation(p, null, null);
}
}

Expand Down Expand Up @@ -649,7 +650,7 @@ public CompletableFuture<ProjectState> withProjectState2(StateRef refCurrent, Pr
return reload.clientFuture;
} finally {
LOG.log(Level.FINE, "Failed to load project {0} with request {1}", new Object[] { p, stateRequest });
endOperation(p, null).run();
endOperation(p, null, null);
}
}

Expand Down Expand Up @@ -743,23 +744,50 @@ public void runProjectAction(Project p, Runnable r) {

public void assertNoOperations() {
synchronized (this) {
if (!pendingOperations.isEmpty()) {
Map<Project, ProjectOperations> ops = new HashMap<>(this.pendingOperations);
ops.values().removeAll(this.terminatingOperations);
if (!ops.isEmpty()) {
System.err.println("Pending operations detected");
for (Map.Entry<Project, ProjectOperations> en : pendingOperations.entrySet()) {
for (Map.Entry<Project, ProjectOperations> en : ops.entrySet()) {
ProjectOperations op = en.getValue();
System.err.println(en.getKey() + ": usage " + op.usage + ", pendingReloads: " + op.pendingReloads.size() + ", actions: " + op.postponedActions.size());
for (Reloader r : op.pendingReloads) {
r.getOriginTrace().printStackTrace();
}
}
}
if (!pendingOperations.isEmpty() || this.loaderProcessors.size() != PROJECT_RELOAD_CONCURRENCY) {
if (!ops.isEmpty() || this.loaderProcessors.size() != PROJECT_RELOAD_CONCURRENCY) {
throw new IllegalStateException();
}
}
}

private static final Runnable EMPTY = () -> {};
private Collection<IdentityHolder> collectRelaeases(ProjectOperations op) {
Collection<IdentityHolder> releases = op.releases;
// this is copied from postCleanup, but can be done in batch without
// checking the project operation is in progress for each reference. These are already removed
// from stateIdentity, so just check they did not obtain another one:
for (Iterator<IdentityHolder> it = releases.iterator(); it.hasNext(); ) {
IdentityHolder expired = it.next();
ProjectStateData d = expired.state.get();
if (d == null) {
it.remove();
}
IdentityHolder h = stateIdentity.get(d);
if (h != null && h != expired) {
it.remove();
}
}
return releases;
}

private void notityReleased(IdentityHolder h) {
ProjectStateData d = h.state.get();
if (d != null) {
h.impl.projectDataReleased(d);
ReloadSpiAccessor.get().release(d);
}
}

/**
* Ends the project operation. Optionally unregisters a reload. If more reloads are pending,
Expand All @@ -773,7 +801,7 @@ public void assertNoOperations() {
* @param reload if not null, specifies the ending reload.
* @return runnable that sends out events and potentially continues next reload.
*/
private Runnable endOperation(Project p, Reloader reload) {
private void endOperation(Project p, Reloader reload, Runnable futureCompleter) {
Reloader nextReloader;
Collection<IdentityHolder> releases = Collections.emptyList();
Collection<Runnable> postponedActions;
Expand All @@ -785,101 +813,113 @@ private Runnable endOperation(Project p, Reloader reload) {
throw new IllegalArgumentException();
}
if (reload != null && !op.removeReloader(reload)) {
return EMPTY;
return;
}
--op.usage;
if (op.usage > 0) {
return EMPTY;
return;
}

// temporary increment
++op.usage;
postponedActions = op.postponedActions;
op.postponedActions = new ArrayList<>();
nextReloader = op.nextReloader();

if (nextReloader != null) {
// schedule the next reload from the same project.
++op.usage;
op.postponedActions = new ArrayList<>();
// will not be releasing ProjectStateData in op.releases now, since they will only queue up again.
} else {
releases = op.releases;
pendingOperations.remove(p);

// this is copied from postCleanup, but can be done in batch without
// checking the project operation is in progress for each reference. These are already removed
// from stateIdentity, so just check they did not obtain another one:
for (Iterator<IdentityHolder> it = releases.iterator(); it.hasNext(); ) {
IdentityHolder expired = it.next();
ProjectStateData d = expired.state.get();
if (d == null) {
it.remove();
}
IdentityHolder h = stateIdentity.get(d);
if (h != null && h != expired) {
it.remove();
}
}
// do not remove from the pendingOperations YET, we want to capture potential reload requests
// until after the events are fired off, so they do not interleave.
releases = collectRelaeases(op);
op.releases = new ArrayList<>();
}
terminatingOperations.add(op);
}
LOG.log(Level.FINE, "Project {0}: releasing postponed actions", p);


// note: this will eventually queue the cleanup again, if the project enter locked operation in the meantime.
releases.forEach(h -> {
ProjectStateData d = h.state.get();
if (d != null) {
h.impl.projectDataReleased(d);
ReloadSpiAccessor.get().release(d);
}

});
releases.forEach(this::notityReleased);

if (futureCompleter != null) {
futureCompleter.run();
}

postponedActions.forEach(Runnable::run);

return () -> {
postponedActions.forEach(Runnable::run);
releases = null;
postponedActions = null;

synchronized (this) {
terminatingOperations.remove(op);
if (nextReloader == null) {
return;
}

// start (first or next) project reload
LOG.log(Level.FINE, "Project {0}: starting reload", nextReloader);
dispatcher.post(() -> {
RequestProcessor loader = null;
while (loader == null) {
try {
// will block if no RPs from loaderThreads are available
loader = loaderProcessors.take();
} catch (InterruptedException ex) {
nextReloader = op.nextReloader();
// if a reload magically appeared, do NOT decrement the usage, as we didn't go through the usage++ in nextReloader != null above.
if (nextReloader == null) {
if (--op.usage == 0) {
// finally remove, but still must process leftovers again
pendingOperations.remove(p);
releases = op.releases;
postponedActions = op.postponedActions;
}
}
} else {
// decrement the temporary inc
op.usage--;
}
}
if (releases != null) {
releases.forEach(this::notityReleased);
postponedActions.forEach(Runnable::run);
}

if (nextReloader == null) {
return;
}

Reloader fNextReloader = nextReloader;

// start (first or next) project reload
LOG.log(Level.FINE, "Project {0}: starting reload", nextReloader);
dispatcher.post(() -> {
RequestProcessor loader = null;
while (loader == null) {
try {
// will block if no RPs from loaderThreads are available
loader = loaderProcessors.take();
} catch (InterruptedException ex) {
}
}

RequestProcessor floader = loader;
RequestProcessor floader = loader;

CompletableFuture<ProjectState> f = CompletableFuture.runAsync(() -> nextReloader.initRound(), loader).
thenCompose((v) -> nextReloader.start(floader));
f.whenComplete((a, b) -> {
try {
LOG.log(Level.FINE, "Load end project {0} with request {1}", new Object[] { nextReloader.project, nextReloader.request });
// postpone event delivery so that the events observers see the Future as completed.
Runnable postActions = endOperation(nextReloader.project, nextReloader);
CompletableFuture<ProjectState> f = CompletableFuture.runAsync(() -> fNextReloader.initRound(), loader).
thenCompose((v) -> fNextReloader.start(floader));
// run this cleanup in the dispatcher thread
f.whenCompleteAsync((a, b) -> {
LOG.log(Level.FINER, "Return RP to the pool", new Object[] { floader });
loaderProcessors.offer(floader);
try {
LOG.log(Level.FINE, "Load end project {0} with request {1}", new Object[] { fNextReloader.project, fNextReloader.request });
// postpone event delivery so that the events observers see the Future as completed.
endOperation(fNextReloader.project, fNextReloader, () -> {;
if (b == null) {
nextReloader.completePending.completeAsync(() -> a, RELOAD_RP);
fNextReloader.completePending.completeAsync(() -> a, RELOAD_RP);
} else {
RELOAD_RP.post(() -> {
nextReloader.completePending.completeExceptionally(b);
fNextReloader.completePending.completeExceptionally(b);
});
}
// this will eventually fire postponed events.
postActions.run();
LOG.log(Level.FINER, "Return RP to the pool", new Object[] { floader });
} catch (ThreadDeath td) {
throw td;
} catch (Throwable t) {
Exceptions.printStackTrace(t);
} finally {
loaderProcessors.offer(floader);
}
});
});
};
});
} catch (ThreadDeath td) {
throw td;
} catch (Throwable t) {
Exceptions.printStackTrace(t);
} finally {
}
}, floader);
});
}

private void postCleanup(IdentityHolder expired) {
Expand Down

0 comments on commit cdd7f17

Please sign in to comment.