Skip to content

Commit

Permalink
ReadManager: reuse reader threads #1937
Browse files Browse the repository at this point in the history
For easier monitoring.
#1937

Also refactored the read-ahead-queue with java.util.concurrent, making
the source smaller.

Tested with org.eclipse.jdt.core.tests.builder.AnnotationDependencyTests
  • Loading branch information
EcljpseB0T authored and jukzi committed Jul 9, 2024
1 parent ba550e2 commit 803b5c8
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ private void abortIfPreviewNotAllowed(ICompilationUnit[] sourceUnits, int maxUn
*/
protected void internalBeginToCompile(ICompilationUnit[] sourceUnits, int maxUnits) {
abortIfPreviewNotAllowed(sourceUnits,maxUnits);
if (!this.useSingleThread && maxUnits >= ReadManager.THRESHOLD)
if (!this.useSingleThread)
this.parser.readManager = new ReadManager(sourceUnits, maxUnits);
try {
// Switch the current policy and compilation result for this unit to the requested one.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2008, 2013 IBM Corporation and others.
* Copyright (c) 2008, 2024 IBM Corporation and others.
*
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
Expand All @@ -14,171 +14,131 @@

package org.eclipse.jdt.internal.compiler;

import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.eclipse.jdt.internal.compiler.env.ICompilationUnit;

public class ReadManager implements Runnable {
ICompilationUnit[] units;
int nextFileToRead;
ICompilationUnit[] filesRead;
char[][] contentsRead;
int readyToReadPosition;
int nextAvailablePosition;
Thread[] readingThreads;
char[] readInProcessMarker = new char[0];
int sleepingThreadCount;
private Throwable caughtException;
/** Reads a list of ICompilationUnit before actually needed (ahead) **/
public class ReadManager {
private static final int CACHE_SIZE = 15; // do not waste memory by keeping too many files in memory
/**
* Not more threads then cache size and leave 2 threads for compiler + writer. Executor should process in fifo order
* (first in first out).
*/
private static int THREAD_COUNT = Math.max(0, Math.min(CACHE_SIZE, Runtime.getRuntime().availableProcessors() - 2));
private static final ExecutorService executor = THREAD_COUNT <= 0 ? null
: Executors.newFixedThreadPool(THREAD_COUNT, r -> {
Thread t = new Thread(r, "Compiler Source File Reader"); //$NON-NLS-1$
t.setDaemon(true);
return t;
});

static final int START_CUSHION = 5;
public static final int THRESHOLD = 10;
static final int CACHE_SIZE = 15; // do not waste memory by keeping too many files in memory
private final Queue<ICompilationUnit> unitsToRead;
private final Map<ICompilationUnit, Future<char[]>> cache = new ConcurrentHashMap<>();

public ReadManager(ICompilationUnit[] files, int length) {
// start the background threads to read the file's contents
int threadCount = Runtime.getRuntime().availableProcessors() + 1;
if (threadCount < 2) {
threadCount = 0;
} else if (threadCount > CACHE_SIZE) {
threadCount = CACHE_SIZE;
}

if (threadCount > 0) {
synchronized (this) {
this.units = new ICompilationUnit[length];
System.arraycopy(files, 0, this.units, 0, length);
this.nextFileToRead = START_CUSHION; // skip some files to reduce the number of times we have to wait
this.filesRead = new ICompilationUnit[CACHE_SIZE];
this.contentsRead = new char[CACHE_SIZE][];
this.readyToReadPosition = 0;
this.nextAvailablePosition = 0;
this.sleepingThreadCount = 0;
this.readingThreads = new Thread[threadCount];
for (int i = threadCount; --i >= 0;) {
this.readingThreads[i] = new Thread(this, "Compiler Source File Reader"); //$NON-NLS-1$
this.readingThreads[i].setDaemon(true);
this.readingThreads[i].start();
}
public ReadManager(ICompilationUnit[] files, int length) {
this.unitsToRead = new ArrayDeque<>(length);
if (executor == null) {
return;
}
}
}

public char[] getContents(ICompilationUnit unit) throws Error {
Thread[] rThreads = this.readingThreads;
if (rThreads == null || this.units.length == 0) {
if (this.caughtException != null) {
// rethrow the caught exception from the readingThreads in the main compiler thread
if (this.caughtException instanceof Error)
throw (Error) this.caughtException;
throw (RuntimeException) this.caughtException;
for (int l = 0; l < length; l++) {
ICompilationUnit unit = files[l];
this.unitsToRead.offer(unit);
}
while (queueNextReadAhead()) {
// queued 1 more
}
return unit.getContents();
}

boolean yield = this.sleepingThreadCount == rThreads.length;
char[] result = null;
synchronized (this) {
if (unit == this.filesRead[this.readyToReadPosition]) {
result = this.contentsRead[this.readyToReadPosition];
while (result == this.readInProcessMarker || result == null) {
// let the readingThread know we're waiting
//System.out.print('|');
this.contentsRead[this.readyToReadPosition] = null;
try {
wait(250);
} catch (InterruptedException ignore) { // ignore
}
if (this.caughtException != null) {
// rethrow the caught exception from the readingThreads in the main compiler thread
if (this.caughtException instanceof Error)
throw (Error) this.caughtException;
throw (RuntimeException) this.caughtException;
}
result = this.contentsRead[this.readyToReadPosition];
/** meant to called in the order of the initial supplied files **/
public char[] getContents(ICompilationUnit unit) throws Error {
if (executor == null) {
return getWithoutExecutor(unit);
}
Future<char[]> future;
synchronized (this) { // atomic remove from unitsToRead or cache
future = this.cache.remove(unit);
if (future == null) {
// unit was not already scheduled
// and does not need to be scheduled anymore
this.unitsToRead.remove(unit);
}
// free spot for next file
this.filesRead[this.readyToReadPosition] = null;
this.contentsRead[this.readyToReadPosition] = null;
if (++this.readyToReadPosition >= this.contentsRead.length)
this.readyToReadPosition = 0;
if (this.sleepingThreadCount > 0) {
//System.out.print('+');
//System.out.print(this.nextFileToRead);
notify();
}
if (future == null) {
// should not happen.
return getWithoutFuture(unit);
}
// now: future != null
queueNextReadAhead();
try {
// unit was already scheduled
// in most cases future is already completed
// Otherwise, when read ahead is slower then compiler,
// wait for completion to avoid extra work of reading files multiple times:
return getWithFuture(future);
} catch (InterruptedException ignored) {
return getWhileInterrupted(unit);
} catch (ExecutionException e) {
// rethrow the caught exception from the reading threads in the main compiler thread
if (e.getCause() instanceof Error err) {
throw err;
}
} else {
// must make sure we're reading ahead of the unit
int unitIndex = 0;
for (int l = this.units.length; unitIndex < l; unitIndex++)
if (this.units[unitIndex] == unit) break;
if (unitIndex == this.units.length) {
// attempting to read a unit that was not included in the initial files - should not happen
this.units = new ICompilationUnit[0]; // stop looking for more
} else if (unitIndex >= this.nextFileToRead) {
// start over
//System.out.println(unitIndex + " vs " + this.nextFileToRead);
this.nextFileToRead = unitIndex + START_CUSHION;
this.readyToReadPosition = 0;
this.nextAvailablePosition = 0;
this.filesRead = new ICompilationUnit[CACHE_SIZE];
this.contentsRead = new char[CACHE_SIZE][];
notifyAll();
if (e.getCause() instanceof RuntimeException ex) {
throw ex;
}
throw new RuntimeException(e);
}
}
if (yield)
Thread.yield(); // ensure other threads get a chance
if (result != null)
return result;
//System.out.print('-');
return unit.getContents();
}

@Override
public void run() {
try {
while (this.readingThreads != null && this.nextFileToRead < this.units.length) {
ICompilationUnit unit = null;
int position = -1;
synchronized (this) {
if (this.readingThreads == null) return;
// distinct methods "getW*" with same content to make it possible to observe with method sampler which case took how long:
private char[] getWithFuture(Future<char[]> future) throws InterruptedException, ExecutionException {
// should happen in most cases
return future.get();
}

private char[] getWithoutExecutor(ICompilationUnit unit) {
// THREAD_COUNT==0 => no read ahead
return unit.getContents();
}

private char[] getWithoutFuture(ICompilationUnit unit) {
// should not happen
return unit.getContents();
}

while (this.filesRead[this.nextAvailablePosition] != null) {
this.sleepingThreadCount++;
try {
wait(250); // wait until a spot in contents is available
} catch (InterruptedException e) { // ignore
}
this.sleepingThreadCount--;
if (this.readingThreads == null) return;
}
private char[] getWhileInterrupted(ICompilationUnit unit) {
// should not happen
return unit.getContents();
}

if (this.nextFileToRead >= this.units.length) return;
unit = this.units[this.nextFileToRead++];
position = this.nextAvailablePosition;
if (++this.nextAvailablePosition >= this.contentsRead.length)
this.nextAvailablePosition = 0;
this.filesRead[position] = unit;
this.contentsRead[position] = this.readInProcessMarker; // mark the spot so we know its being read
}
char[] result = unit.getContents();
synchronized (this) {
if (this.filesRead[position] == unit) {
if (this.contentsRead[position] == null) // wake up main thread which is waiting for this file
notifyAll();
this.contentsRead[position] = result;
}
}
private boolean queueNextReadAhead() {
if (this.cache.size() >= CACHE_SIZE) {
return false;
}
} catch (Error | RuntimeException e) {
synchronized (this) {
this.caughtException = e;
shutdown();
synchronized (this) { // atomic move from unitsToRead to cache
ICompilationUnit nextUnit = this.unitsToRead.poll();
if (nextUnit == null) {
return false;
}
Future<char[]> future = executor.submit(() -> readAhead(nextUnit));
this.cache.put(nextUnit, future);
return true;
}
return;
}
}

public synchronized void shutdown() {
this.readingThreads = null; // mark the read manager as shutting down so that the reading threads stop
notifyAll();
}
private char[] readAhead(ICompilationUnit unit) {
queueNextReadAhead();
return unit.getContents();
}

public void shutdown() {
this.unitsToRead.clear();
this.cache.clear();
}
}

0 comments on commit 803b5c8

Please sign in to comment.