Skip to content

Commit

Permalink
Actually fix parallel executor
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebemish committed Mar 8, 2024
1 parent 2c7e4f5 commit 2c9b629
Showing 1 changed file with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,56 @@

package dev.lukebemish.defaultresources.impl;

import java.util.List;
import java.util.concurrent.*;
import net.minecraft.util.Unit;

import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;

public final class ParallelExecutor {
private ParallelExecutor() {}

private static final AtomicInteger POOL_THREAD_COUNTER = new AtomicInteger(0);
private static final ForkJoinPool POOL;

static {
final ClassLoader classLoader = ParallelExecutor.class.getClassLoader();
POOL = new ForkJoinPool(
Math.max(4, Runtime.getRuntime().availableProcessors() - 4),
forkJoinPool -> {
final ForkJoinWorkerThread thread = new ForkJoinWorkerThread(forkJoinPool) {};
thread.setContextClassLoader(classLoader);
thread.setName(String.format("DefaultResources parallel executor: %s", POOL_THREAD_COUNTER.incrementAndGet()));
return thread;
}, null, true
);
}

public static <T> void execute(Stream<T> stream, Consumer<T> task) {
var threads = Math.max(4, Runtime.getRuntime().availableProcessors() - 4);
ExecutorService exec = Executors.newFixedThreadPool(threads);
List<Callable<Void>> tasks = stream.map(t -> (Callable<Void>) () -> {
task.accept(t);
return null;
}).toList();
try {
for (var f : exec.invokeAll(tasks)) {
f.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
exec.shutdown();
POOL.invoke(new RunnableExecuteAction(() -> stream.parallel().forEach(task)));
}

private static final class RunnableExecuteAction extends ForkJoinTask<Unit> {
final Runnable runnable;

private RunnableExecuteAction(Runnable runnable) {
this.runnable = Objects.requireNonNull(runnable);
}

public Unit getRawResult() {
return Unit.INSTANCE;
}

public void setRawResult(Unit v) {
}

public boolean exec() {
this.runnable.run();
return true;
}
}
}

0 comments on commit 2c9b629

Please sign in to comment.