diff --git a/Common/src/main/java/dev/lukebemish/defaultresources/impl/ParallelExecutor.java b/Common/src/main/java/dev/lukebemish/defaultresources/impl/ParallelExecutor.java index 69bfb0e..98f436c 100644 --- a/Common/src/main/java/dev/lukebemish/defaultresources/impl/ParallelExecutor.java +++ b/Common/src/main/java/dev/lukebemish/defaultresources/impl/ParallelExecutor.java @@ -5,29 +5,56 @@ package dev.lukebemish.defaultresources.impl; -import java.util.List; -import java.util.concurrent.*; +import net.minecraft.util.Unit; + +import java.util.Objects; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Stream; public final class ParallelExecutor { private ParallelExecutor() {} + private static final AtomicInteger POOL_THREAD_COUNTER = new AtomicInteger(0); + private static final ForkJoinPool POOL; + + static { + final ClassLoader classLoader = ParallelExecutor.class.getClassLoader(); + POOL = new ForkJoinPool( + Math.max(4, Runtime.getRuntime().availableProcessors() - 4), + forkJoinPool -> { + final ForkJoinWorkerThread thread = new ForkJoinWorkerThread(forkJoinPool) {}; + thread.setContextClassLoader(classLoader); + thread.setName(String.format("DefaultResources parallel executor: %s", POOL_THREAD_COUNTER.incrementAndGet())); + return thread; + }, null, true + ); + } + public static void execute(Stream stream, Consumer task) { - var threads = Math.max(4, Runtime.getRuntime().availableProcessors() - 4); - ExecutorService exec = Executors.newFixedThreadPool(threads); - List> tasks = stream.map(t -> (Callable) () -> { - task.accept(t); - return null; - }).toList(); - try { - for (var f : exec.invokeAll(tasks)) { - f.get(); - } - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } finally { - exec.shutdown(); + POOL.invoke(new RunnableExecuteAction(() -> stream.parallel().forEach(task))); + } + + private static final class RunnableExecuteAction extends ForkJoinTask { + final Runnable runnable; + + private RunnableExecuteAction(Runnable runnable) { + this.runnable = Objects.requireNonNull(runnable); + } + + public Unit getRawResult() { + return Unit.INSTANCE; + } + + public void setRawResult(Unit v) { + } + + public boolean exec() { + this.runnable.run(); + return true; } } }