From c127c5a7165747b2bdbb4948e4a3913b5569793e Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Wed, 1 May 2024 16:01:47 -0500 Subject: [PATCH 1/2] Move unshared fields to arrays The arrays can then be sized based on the run time cache line size, allowing better parallelism on CPUs with long cache lines. --- pom.xml | 16 ++ .../jboss/threads/EnhancedQueueExecutor.java | 191 +++++++++++++++--- .../threads/EnhancedQueueExecutorBase0.java | 55 ----- .../threads/EnhancedQueueExecutorBase1.java | 40 ---- .../threads/EnhancedQueueExecutorBase2.java | 17 -- .../threads/EnhancedQueueExecutorBase3.java | 44 ---- .../threads/EnhancedQueueExecutorBase4.java | 17 -- .../threads/EnhancedQueueExecutorBase5.java | 49 ----- .../threads/EnhancedQueueExecutorBase6.java | 17 -- 9 files changed, 179 insertions(+), 267 deletions(-) delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase0.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase1.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase2.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase3.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase4.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase5.java delete mode 100644 src/main/java/org/jboss/threads/EnhancedQueueExecutorBase6.java diff --git a/pom.xml b/pom.xml index 3a57a0f..4904b1e 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,18 @@ 11 + + + + io.smallrye.common + smallrye-common-bom + 2.4.0 + pom + import + + + + org.graalvm.sdk @@ -102,6 +114,10 @@ wildfly-common 1.7.0.Final + + io.smallrye.common + smallrye-common-cpu + org.junit.jupiter junit-jupiter diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 9d270ef..4e5b56e 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -22,6 +22,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; @@ -46,8 +47,10 @@ import javax.management.ObjectInstance; import javax.management.ObjectName; +import io.smallrye.common.cpu.CacheInfo; import org.jboss.threads.management.ManageableThreadPoolExecutorService; import org.jboss.threads.management.StandardThreadPoolMXBean; + import org.wildfly.common.Assert; import org.wildfly.common.cpu.ProcessorInfo; @@ -66,7 +69,7 @@ * * @author David M. Lloyd */ -public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService, ScheduledExecutorService { +public final class EnhancedQueueExecutor extends AbstractExecutorService implements ManageableThreadPoolExecutorService, ScheduledExecutorService { private static final Thread[] NO_THREADS = new Thread[0]; static { @@ -205,6 +208,35 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl // Current state fields // ======================================================= + /** + * Unshared object fields (indexes are relative to units of cache line size): + *
    + *
  • {@code 0} {@code tail}: The node preceding the tail node; this field is not {@code null}. + * This is the insertion point for tasks (and the removal point for waiting threads).
  • + *
  • {@code 1} {@code head}: The node preceding the head node; this field is not {@code null}. + * This is the removal point for tasks (and the insertion point for waiting threads).
  • + *
+ */ + final Object[] unsharedObjects = new Object[RuntimeFields.unsharedObjectsSize]; + + /** + * Unshared long fields (indexes are relative to units of cache line size): + *
    + *
  • {@code 0} {@code threadStatus}: Information about the current pool status: + *
      + *
    • Bit 00..19: current number of running threads
    • + *
    • Bit 20..39: core pool size
    • + *
    • Bit 40..59: maximum pool size
    • + *
    • Bit 60: 1 = allow core thread timeout; 0 = disallow core thread timeout
    • + *
    • Bit 61: 1 = shutdown requested; 0 = shutdown not requested
    • + *
    • Bit 62: 1 = shutdown task interrupt requested; 0 = interrupt not requested
    • + *
    • Bit 63: 1 = shutdown complete; 0 = shutdown not complete
    • + *
    + *
  • + *
+ */ + final long[] unsharedLongs = new long[RuntimeFields.unsharedLongsSize]; + /** * The linked list of threads waiting for termination of this thread pool. */ @@ -279,6 +311,9 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl // Updaters // ======================================================= + private static final int numUnsharedLongs = 1; + private static final int numUnsharedObjects = 2; + private static final long terminationWaitersOffset; private static final long queueSizeOffset; @@ -287,6 +322,33 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl private static final long activeCountOffset; private static final long peakQueueSizeOffset; + // GraalVM should initialize this class at run time + private static final class RuntimeFields { + private static final int unsharedObjectsSize; + private static final int unsharedLongsSize; + + private static final long headOffset; + private static final long tailOffset; + private static final long threadStatusOffset; + + static { + int cacheLine = CacheInfo.getSmallestDataCacheLineSize(); + if (cacheLine == 0) { + // guess + cacheLine = 64; + } + int longScale = unsafe.arrayIndexScale(long[].class); + int objScale = unsafe.arrayIndexScale(Object[].class); + // these fields are in units of array scale + unsharedObjectsSize = cacheLine / objScale * (numUnsharedObjects + 1); + unsharedLongsSize = cacheLine / longScale * (numUnsharedLongs + 1); + // these fields are in bytes + headOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine; + tailOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine * 2; + threadStatusOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine; + } + } + static { try { terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters")); @@ -348,7 +410,8 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl static final AtomicInteger sequence = new AtomicInteger(1); EnhancedQueueExecutor(final Builder builder) { - super(); + setHeadPlain(setTailPlain(new EnhancedQueueExecutor.TaskNode(null))); + int maxSize = builder.getMaximumPoolSize(); int coreSize = min(builder.getCorePoolSize(), maxSize); this.handoffExecutor = builder.getHandoffExecutor(); @@ -363,7 +426,7 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl final Duration keepAliveTime = builder.getKeepAliveTime(); // initial dead node // thread stat - threadStatus = withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize); + setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize)); timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime); queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()); mxBean = new MXBeanImpl(); @@ -786,7 +849,7 @@ public void execute(Runnable runnable) { boolean ok = false; if (result == EXE_OK) { // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition - if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { + if (currentSizeOf(getThreadStatus()) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { deallocateThread(); } if (UPDATE_STATISTICS) submittedTaskCounter.increment(); @@ -824,13 +887,13 @@ public void shutdown() { public List shutdownNow() { shutdown(true); final ArrayList list = new ArrayList<>(); - TaskNode head = this.head; + TaskNode head = getHead(); QNode headNext; for (;;) { headNext = head.getNext(); if (headNext == head) { // a racing consumer has already consumed it (and moved head) - head = this.head; + head = getHead(); continue; } if (headNext instanceof TaskNode) { @@ -856,7 +919,7 @@ public List shutdownNow() { * @return {@code true} if shutdown was requested, {@code false} otherwise */ public boolean isShutdown() { - return isShutdownRequested(threadStatus); + return isShutdownRequested(getThreadStatus()); } /** @@ -865,7 +928,7 @@ public boolean isShutdown() { * @return {@code true} if shutdown has completed, {@code false} otherwise */ public boolean isTerminated() { - return isShutdownComplete(threadStatus); + return isShutdownComplete(getThreadStatus()); } /** @@ -981,7 +1044,7 @@ public void shutdown(boolean interrupt) { // post-actions (fail): // repeat state change until success or return do { - oldStatus = threadStatus; + oldStatus = getThreadStatus(); newStatus = withShutdownRequested(oldStatus); if (interrupt) newStatus = withShutdownInterrupt(newStatus); if (currentSizeOf(oldStatus) == 0) newStatus = withShutdownComplete(newStatus); @@ -994,7 +1057,7 @@ public void shutdown(boolean interrupt) { // terminate the scheduler schedulerTask.shutdown(); // clear out all consumers and append a dummy waiter node - TaskNode tail = this.tail; + TaskNode tail = getTail(); QNode tailNext; // a marker to indicate that termination was requested for (;;) { @@ -1059,7 +1122,7 @@ public void shutdown(boolean interrupt) { * @return {@code true} if the thread pool is terminating, or {@code false} if the thread pool is not terminating or has completed termination */ public boolean isTerminating() { - final long threadStatus = this.threadStatus; + final long threadStatus = getThreadStatus(); return isShutdownRequested(threadStatus) && ! isShutdownComplete(threadStatus); } @@ -1128,7 +1191,7 @@ public void setGrowthResistance(final float growthResistance) { * @see Builder#getCorePoolSize() Builder.getCorePoolSize() */ public int getCorePoolSize() { - return coreSizeOf(threadStatus); + return coreSizeOf(getThreadStatus()); } /** @@ -1143,7 +1206,7 @@ public void setCorePoolSize(final int corePoolSize) { Assert.checkMaximumParameter("corePoolSize", TS_THREAD_CNT_MASK, corePoolSize); long oldVal, newVal; do { - oldVal = threadStatus; + oldVal = getThreadStatus(); if (corePoolSize > maxSizeOf(oldVal)) { // automatically bump up max size to match newVal = withCoreSize(withMaxSize(oldVal, corePoolSize), corePoolSize); @@ -1166,7 +1229,7 @@ public void setCorePoolSize(final int corePoolSize) { * @see Builder#getMaximumPoolSize() Builder.getMaximumPoolSize() */ public int getMaximumPoolSize() { - return maxSizeOf(threadStatus); + return maxSizeOf(getThreadStatus()); } /** @@ -1181,7 +1244,7 @@ public void setMaximumPoolSize(final int maxPoolSize) { Assert.checkMaximumParameter("maxPoolSize", TS_THREAD_CNT_MASK, maxPoolSize); long oldVal, newVal; do { - oldVal = threadStatus; + oldVal = getThreadStatus(); if (maxPoolSize < coreSizeOf(oldVal)) { // automatically bump down core size to match newVal = withCoreSize(withMaxSize(oldVal, maxPoolSize), maxPoolSize); @@ -1205,7 +1268,7 @@ public void setMaximumPoolSize(final int maxPoolSize) { * @see Builder#allowsCoreThreadTimeOut() Builder.allowsCoreThreadTimeOut() */ public boolean allowsCoreThreadTimeOut() { - return isAllowCoreTimeout(threadStatus); + return isAllowCoreTimeout(getThreadStatus()); } /** @@ -1218,7 +1281,7 @@ public boolean allowsCoreThreadTimeOut() { public void allowCoreThreadTimeOut(boolean value) { long oldVal, newVal; do { - oldVal = threadStatus; + oldVal = getThreadStatus(); newVal = withAllowCoreTimeout(oldVal, value); if (oldVal == newVal) return; } while (! compareAndSetThreadStatus(oldVal, newVal)); @@ -1439,7 +1502,7 @@ public long getCompletedTaskCount() { * @return an estimate of the current number of active threads in the pool */ public int getPoolSize() { - return currentSizeOf(threadStatus); + return currentSizeOf(getThreadStatus()); } /** @@ -1527,7 +1590,7 @@ public void run() { continue waitingForTask; } else { final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos; - long oldVal = threadStatus; + long oldVal = getThreadStatus(); if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) { // try to exit this thread, if we are allowed if (task == EXIT || @@ -1543,7 +1606,7 @@ public void run() { return; } if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = threadStatus; + oldVal = getThreadStatus(); } //throw Assert.unreachableCode(); } @@ -1589,7 +1652,7 @@ private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) { TaskNode head; QNode headNext; for (;;) { - head = EnhancedQueueExecutor.this.head; + head = getHead(); headNext = head.getNext(); // headNext == head can happen if another consumer has already consumed head: // retry with a fresh head @@ -1645,7 +1708,7 @@ int tryAllocateThread(final float growthResistance) { int oldSize; long oldStat; for (;;) { - oldStat = threadStatus; + oldStat = getThreadStatus(); if (isShutdownRequested(oldStat)) { return AT_SHUTDOWN; } @@ -1694,7 +1757,7 @@ int tryAllocateThread(final float growthResistance) { void deallocateThread() { long oldStat; do { - oldStat = threadStatus; + oldStat = getThreadStatus(); } while (! tryDeallocateThread(oldStat)); } @@ -1766,7 +1829,7 @@ boolean doStartThread(Task runnable) throws RejectedExecutionException { private int tryExecute(final Task runnable) { QNode tailNext; - TaskNode tail = this.tail; + TaskNode tail = getTail(); TaskNode node = null; for (;;) { tailNext = tail.getNext(); @@ -1873,7 +1936,7 @@ private int tryExecute(final Task runnable) { } // retry with new tail(snapshot) if (UPDATE_STATISTICS) spinMisses.increment(); - tail = this.tail; + tail = getTail(); } // not reached } @@ -1903,7 +1966,7 @@ void completeTermination() { unpark(waiters.getThread()); waiters = waiters.getNext(); } - tail.setNext(TERMINATE_COMPLETE); + getTail().setNext(TERMINATE_COMPLETE); if (!DISABLE_MBEAN) { //The check for DISABLE_MBEAN is redundant as acc would be null, //but GraalVM needs the hint so to not make JMX reachable. @@ -1927,6 +1990,45 @@ void completeTermination() { // Compare-and-set operations // ======================================================= + TaskNode getTail() { + return (TaskNode) unsafe.getObjectVolatile(unsharedObjects, RuntimeFields.tailOffset); + } + + TaskNode setTailPlain(TaskNode tail) { + unsafe.putObject(unsharedObjects, RuntimeFields.tailOffset, tail); + return tail; + } + + boolean compareAndSetTail(final EnhancedQueueExecutor.TaskNode expect, final EnhancedQueueExecutor.TaskNode update) { + return getTail() == expect && unsafe.compareAndSwapObject(unsharedObjects, RuntimeFields.tailOffset, expect, update); + } + + TaskNode getHead() { + return (TaskNode) unsafe.getObjectVolatile(unsharedObjects, RuntimeFields.headOffset); + } + + TaskNode setHeadPlain(TaskNode head) { + unsafe.putObject(unsharedObjects, RuntimeFields.headOffset, head); + return head; + } + + boolean compareAndSetHead(final EnhancedQueueExecutor.TaskNode expect, final EnhancedQueueExecutor.TaskNode update) { + return unsafe.compareAndSwapObject(unsharedObjects, RuntimeFields.headOffset, expect, update); + } + + long getThreadStatus() { + return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.threadStatusOffset); + } + + long setThreadStatusPlain(long status) { + unsafe.putLong(unsharedLongs, RuntimeFields.threadStatusOffset, status); + return status; + } + + boolean compareAndSetThreadStatus(final long expect, final long update) { + return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.threadStatusOffset, expect, update); + } + void incrementActiveCount() { unsafe.getAndAddInt(this, activeCountOffset, 1); } @@ -2083,6 +2185,39 @@ static boolean isAllowCoreTimeout(final long oldVal) { // Static configuration // ======================================================= + static int readIntPropertyPrefixed(String name, int defVal) { + try { + return Integer.parseInt(readPropertyPrefixed(name, Integer.toString(defVal))); + } catch (NumberFormatException ignored) { + return defVal; + } + } + + static boolean readBooleanPropertyPrefixed(String name, boolean defVal) { + return Boolean.parseBoolean(readPropertyPrefixed(name, Boolean.toString(defVal))); + } + + static String readPropertyPrefixed(String name, String defVal) { + return readProperty("jboss.threads.eqe." + name, defVal); + } + + static String readProperty(String name, String defVal) { + final SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + return doPrivileged(new PrivilegedAction() { + public String run() { + return readPropertyRaw(name, defVal); + } + }); + } else { + return readPropertyRaw(name, defVal); + } + } + + static String readPropertyRaw(final String name, final String defVal) { + return System.getProperty(name, defVal); + } + // ======================================================= // Utilities // ======================================================= @@ -2483,7 +2618,7 @@ final class Task implements Runnable { @Override public void run() { - if (isShutdownInterrupt(threadStatus)) { + if (isShutdownInterrupt(getThreadStatus())) { Thread.currentThread().interrupt(); } else { Thread.interrupted(); @@ -2776,7 +2911,7 @@ void submit() { boolean ok = false; if (result == EXE_OK) { // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition - if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { + if (currentSizeOf(getThreadStatus()) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { deallocateThread(); } if (UPDATE_STATISTICS) submittedTaskCounter.increment(); diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase0.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase0.java deleted file mode 100644 index 9e08570..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase0.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.jboss.threads; - -import static java.security.AccessController.doPrivileged; - -import java.security.PrivilegedAction; -import java.util.concurrent.AbstractExecutorService; - -/** - * EQE base class: shared utilities and initial padding. - */ -abstract class EnhancedQueueExecutorBase0 extends AbstractExecutorService { - /** - * Padding fields. - */ - @SuppressWarnings("unused") - int p00, p01, p02, p03, - p04, p05, p06, p07, - p08, p09, p0A, p0B, - p0C, p0D, p0E, p0F; - - EnhancedQueueExecutorBase0() {} - - static int readIntPropertyPrefixed(String name, int defVal) { - try { - return Integer.parseInt(readPropertyPrefixed(name, Integer.toString(defVal))); - } catch (NumberFormatException ignored) { - return defVal; - } - } - - static boolean readBooleanPropertyPrefixed(String name, boolean defVal) { - return Boolean.parseBoolean(readPropertyPrefixed(name, Boolean.toString(defVal))); - } - - static String readPropertyPrefixed(String name, String defVal) { - return readProperty("jboss.threads.eqe." + name, defVal); - } - - static String readProperty(String name, String defVal) { - final SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - return doPrivileged(new PrivilegedAction() { - public String run() { - return readPropertyRaw(name, defVal); - } - }); - } else { - return readPropertyRaw(name, defVal); - } - } - - static String readPropertyRaw(final String name, final String defVal) { - return System.getProperty(name, defVal); - } -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase1.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase1.java deleted file mode 100644 index f1225dc..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase1.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.jboss.threads; - -import static org.jboss.threads.JBossExecutors.unsafe; - -import org.wildfly.common.annotation.NotNull; - -/** - * EQE base class: tail section. - */ -abstract class EnhancedQueueExecutorBase1 extends EnhancedQueueExecutorBase0 { - - static final long tailOffset; - - static { - try { - tailOffset = unsafe.objectFieldOffset(EnhancedQueueExecutorBase1.class.getDeclaredField("tail")); - } catch (NoSuchFieldException e) { - throw new NoSuchFieldError(e.getMessage()); - } - } - - - /** - * The node preceding the tail node; this field is not {@code null}. This - * is the insertion point for tasks (and the removal point for waiting threads). - */ - @NotNull - @SuppressWarnings("unused") // used by field updater - volatile EnhancedQueueExecutor.TaskNode tail; - - EnhancedQueueExecutorBase1() {} - - // ======================================================= - // Compare-and-set operations - // ======================================================= - - boolean compareAndSetTail(final EnhancedQueueExecutor.TaskNode expect, final EnhancedQueueExecutor.TaskNode update) { - return tail == expect && unsafe.compareAndSwapObject(this, tailOffset, expect, update); - } -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase2.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase2.java deleted file mode 100644 index 49d0665..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase2.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.jboss.threads; - -/** - * EQE base class: padding. - */ -abstract class EnhancedQueueExecutorBase2 extends EnhancedQueueExecutorBase1 { - /** - * Padding fields. - */ - @SuppressWarnings("unused") - int p00, p01, p02, p03, - p04, p05, p06, p07, - p08, p09, p0A, p0B, - p0C, p0D, p0E, p0F; - - EnhancedQueueExecutorBase2() {} -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase3.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase3.java deleted file mode 100644 index 245009f..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase3.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.jboss.threads; - -import static org.jboss.threads.JBossExecutors.unsafe; - -import org.wildfly.common.annotation.NotNull; - -/** - * EQE base class: head section. - */ -abstract class EnhancedQueueExecutorBase3 extends EnhancedQueueExecutorBase2 { - static final long headOffset; - - static { - try { - headOffset = unsafe.objectFieldOffset(EnhancedQueueExecutorBase3.class.getDeclaredField("head")); - } catch (NoSuchFieldException e) { - throw new NoSuchFieldError(e.getMessage()); - } - } - - // ======================================================= - // Current state fields - // ======================================================= - - /** - * The node preceding the head node; this field is not {@code null}. This is - * the removal point for tasks (and the insertion point for waiting threads). - */ - @NotNull - @SuppressWarnings("unused") // used by field updater - volatile EnhancedQueueExecutor.TaskNode head; - - EnhancedQueueExecutorBase3() { - head = tail = new EnhancedQueueExecutor.TaskNode(null); - } - - // ======================================================= - // Compare-and-set operations - // ======================================================= - - boolean compareAndSetHead(final EnhancedQueueExecutor.TaskNode expect, final EnhancedQueueExecutor.TaskNode update) { - return unsafe.compareAndSwapObject(this, headOffset, expect, update); - } -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase4.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase4.java deleted file mode 100644 index 6aa2743..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase4.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.jboss.threads; - -/** - * EQE base class: padding. - */ -abstract class EnhancedQueueExecutorBase4 extends EnhancedQueueExecutorBase3 { - /** - * Padding fields. - */ - @SuppressWarnings("unused") - int p00, p01, p02, p03, - p04, p05, p06, p07, - p08, p09, p0A, p0B, - p0C, p0D, p0E, p0F; - - EnhancedQueueExecutorBase4() {} -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase5.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase5.java deleted file mode 100644 index cace4cc..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase5.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.jboss.threads; - -import static org.jboss.threads.JBossExecutors.unsafe; - -/** - * EQE base: thread status - */ -abstract class EnhancedQueueExecutorBase5 extends EnhancedQueueExecutorBase4 { - static final long threadStatusOffset; - - static { - try { - threadStatusOffset = unsafe.objectFieldOffset(EnhancedQueueExecutorBase5.class.getDeclaredField("threadStatus")); - } catch (NoSuchFieldException e) { - throw new NoSuchFieldError(e.getMessage()); - } - } - - // ======================================================= - // Current state fields - // ======================================================= - - /** - * Active consumers: - *
    - *
  • Bit 00..19: current number of running threads
  • - *
  • Bit 20..39: core pool size
  • - *
  • Bit 40..59: maximum pool size
  • - *
  • Bit 60: 1 = allow core thread timeout; 0 = disallow core thread timeout
  • - *
  • Bit 61: 1 = shutdown requested; 0 = shutdown not requested
  • - *
  • Bit 62: 1 = shutdown task interrupt requested; 0 = interrupt not requested
  • - *
  • Bit 63: 1 = shutdown complete; 0 = shutdown not complete
  • - *
- */ - @SuppressWarnings("unused") // used by field updater - volatile long threadStatus; - - EnhancedQueueExecutorBase5() { - super(); - } - - // ======================================================= - // Compare-and-set operations - // ======================================================= - - boolean compareAndSetThreadStatus(final long expect, final long update) { - return unsafe.compareAndSwapLong(this, threadStatusOffset, expect, update); - } -} diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase6.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase6.java deleted file mode 100644 index 49c3ba8..0000000 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutorBase6.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.jboss.threads; - -/** - * EQE base class: padding. - */ -abstract class EnhancedQueueExecutorBase6 extends EnhancedQueueExecutorBase5 { - /** - * Padding fields. - */ - @SuppressWarnings("unused") - int p00, p01, p02, p03, - p04, p05, p06, p07, - p08, p09, p0A, p0B, - p0C, p0D, p0E, p0F; - - EnhancedQueueExecutorBase6() {} -} From 9389f8815c8ca9351d5256e254594a2a5af7d021 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Thu, 2 May 2024 08:41:03 -0500 Subject: [PATCH 2/2] Move queue size to unshared longs --- .../jboss/threads/EnhancedQueueExecutor.java | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 4e5b56e..54d1a30 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -233,6 +233,12 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme *
  • Bit 63: 1 = shutdown complete; 0 = shutdown not complete
  • * * + *
  • {@code 1} {@code queueSize}: Information about the current queue size: + *
      + *
    • Bit 00..1F: current queue length
    • + *
    • Bit 20..3F: queue limit
    • + *
    + *
  • * */ final long[] unsharedLongs = new long[RuntimeFields.unsharedLongsSize]; @@ -243,16 +249,6 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme @SuppressWarnings("unused") // used by field updater volatile Waiter terminationWaiters; - /** - * Queue size: - *
      - *
    • Bit 00..1F: current queue length
    • - *
    • Bit 20..3F: queue limit
    • - *
    - */ - @SuppressWarnings("unused") // used by field updater - volatile long queueSize; - /** * The thread keep-alive timeout value. */ @@ -311,13 +307,11 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme // Updaters // ======================================================= - private static final int numUnsharedLongs = 1; + private static final int numUnsharedLongs = 2; private static final int numUnsharedObjects = 2; private static final long terminationWaitersOffset; - private static final long queueSizeOffset; - private static final long peakThreadCountOffset; private static final long activeCountOffset; private static final long peakQueueSizeOffset; @@ -329,7 +323,9 @@ private static final class RuntimeFields { private static final long headOffset; private static final long tailOffset; + private static final long threadStatusOffset; + private static final long queueSizeOffset; static { int cacheLine = CacheInfo.getSmallestDataCacheLineSize(); @@ -346,6 +342,7 @@ private static final class RuntimeFields { headOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine; tailOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine * 2; threadStatusOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine; + queueSizeOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine * 2; } } @@ -353,8 +350,6 @@ private static final class RuntimeFields { try { terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters")); - queueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("queueSize")); - peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount")); activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount")); peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize")); @@ -428,7 +423,7 @@ private static final class RuntimeFields { // thread stat setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize)); timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime); - queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()); + setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize())); mxBean = new MXBeanImpl(); if (! DISABLE_MBEAN && builder.isRegisterMBean()) { this.acc = getContext(); @@ -1358,7 +1353,7 @@ public void setKeepAliveTime(final Duration keepAliveTime) { * @see Builder#getMaximumQueueSize() Builder.getMaximumQueueSize() */ public int getMaximumQueueSize() { - return maxQueueSizeOf(queueSize); + return maxQueueSizeOf(getQueueSizeVolatile()); } /** @@ -1374,7 +1369,7 @@ public void setMaximumQueueSize(final int maxQueueSize) { if (NO_QUEUE_LIMIT) return; long oldVal; do { - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); } while (! compareAndSetQueueSize(oldVal, withMaxQueueSize(oldVal, maxQueueSize))); } @@ -1435,7 +1430,7 @@ public void setTerminationTask(final Runnable terminationTask) { * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled */ public int getQueueSize() { - return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(queueSize); + return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(getQueueSizeVolatile()); } /** @@ -2045,8 +2040,17 @@ boolean compareAndSetPeakQueueSize(final int expect, final int update) { return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update); } + long getQueueSizeVolatile() { + return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.queueSizeOffset); + } + + long setQueueSizePlain(long queueSize) { + unsafe.putLong(unsharedLongs, RuntimeFields.queueSizeOffset, queueSize); + return queueSize; + } + boolean compareAndSetQueueSize(final long expect, final long update) { - return unsafe.compareAndSwapLong(this, queueSizeOffset, expect, update); + return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.queueSizeOffset, expect, update); } boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) { @@ -2062,7 +2066,7 @@ Waiter getAndSetTerminationWaiters(final Waiter update) { // ======================================================= boolean increaseQueueSize() { - long oldVal = queueSize; + long oldVal = getQueueSizeVolatile(); int oldSize = currentQueueSizeOf(oldVal); if (oldSize >= maxQueueSizeOf(oldVal)) { // reject @@ -2071,7 +2075,7 @@ boolean increaseQueueSize() { int newSize = oldSize + 1; while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, newSize))) { if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); oldSize = currentQueueSizeOf(oldVal); if (oldSize >= maxQueueSizeOf(oldVal)) { // reject @@ -2091,11 +2095,11 @@ boolean increaseQueueSize() { void decreaseQueueSize() { long oldVal; - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); assert currentQueueSizeOf(oldVal) > 0; while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, currentQueueSizeOf(oldVal) - 1))) { if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); assert currentQueueSizeOf(oldVal) > 0; } }