diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 4e5b56e..54d1a30 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -233,6 +233,12 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme *
  • Bit 63: 1 = shutdown complete; 0 = shutdown not complete
  • * * + *
  • {@code 1} {@code queueSize}: Information about the current queue size: + * + *
  • * */ final long[] unsharedLongs = new long[RuntimeFields.unsharedLongsSize]; @@ -243,16 +249,6 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme @SuppressWarnings("unused") // used by field updater volatile Waiter terminationWaiters; - /** - * Queue size: - * - */ - @SuppressWarnings("unused") // used by field updater - volatile long queueSize; - /** * The thread keep-alive timeout value. */ @@ -311,13 +307,11 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme // Updaters // ======================================================= - private static final int numUnsharedLongs = 1; + private static final int numUnsharedLongs = 2; private static final int numUnsharedObjects = 2; private static final long terminationWaitersOffset; - private static final long queueSizeOffset; - private static final long peakThreadCountOffset; private static final long activeCountOffset; private static final long peakQueueSizeOffset; @@ -329,7 +323,9 @@ private static final class RuntimeFields { private static final long headOffset; private static final long tailOffset; + private static final long threadStatusOffset; + private static final long queueSizeOffset; static { int cacheLine = CacheInfo.getSmallestDataCacheLineSize(); @@ -346,6 +342,7 @@ private static final class RuntimeFields { headOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine; tailOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine * 2; threadStatusOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine; + queueSizeOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine * 2; } } @@ -353,8 +350,6 @@ private static final class RuntimeFields { try { terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters")); - queueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("queueSize")); - peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount")); activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount")); peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize")); @@ -428,7 +423,7 @@ private static final class RuntimeFields { // thread stat setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize)); timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime); - queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()); + setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize())); mxBean = new MXBeanImpl(); if (! DISABLE_MBEAN && builder.isRegisterMBean()) { this.acc = getContext(); @@ -1358,7 +1353,7 @@ public void setKeepAliveTime(final Duration keepAliveTime) { * @see Builder#getMaximumQueueSize() Builder.getMaximumQueueSize() */ public int getMaximumQueueSize() { - return maxQueueSizeOf(queueSize); + return maxQueueSizeOf(getQueueSizeVolatile()); } /** @@ -1374,7 +1369,7 @@ public void setMaximumQueueSize(final int maxQueueSize) { if (NO_QUEUE_LIMIT) return; long oldVal; do { - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); } while (! compareAndSetQueueSize(oldVal, withMaxQueueSize(oldVal, maxQueueSize))); } @@ -1435,7 +1430,7 @@ public void setTerminationTask(final Runnable terminationTask) { * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled */ public int getQueueSize() { - return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(queueSize); + return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(getQueueSizeVolatile()); } /** @@ -2045,8 +2040,17 @@ boolean compareAndSetPeakQueueSize(final int expect, final int update) { return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update); } + long getQueueSizeVolatile() { + return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.queueSizeOffset); + } + + long setQueueSizePlain(long queueSize) { + unsafe.putLong(unsharedLongs, RuntimeFields.queueSizeOffset, queueSize); + return queueSize; + } + boolean compareAndSetQueueSize(final long expect, final long update) { - return unsafe.compareAndSwapLong(this, queueSizeOffset, expect, update); + return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.queueSizeOffset, expect, update); } boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) { @@ -2062,7 +2066,7 @@ Waiter getAndSetTerminationWaiters(final Waiter update) { // ======================================================= boolean increaseQueueSize() { - long oldVal = queueSize; + long oldVal = getQueueSizeVolatile(); int oldSize = currentQueueSizeOf(oldVal); if (oldSize >= maxQueueSizeOf(oldVal)) { // reject @@ -2071,7 +2075,7 @@ boolean increaseQueueSize() { int newSize = oldSize + 1; while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, newSize))) { if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); oldSize = currentQueueSizeOf(oldVal); if (oldSize >= maxQueueSizeOf(oldVal)) { // reject @@ -2091,11 +2095,11 @@ boolean increaseQueueSize() { void decreaseQueueSize() { long oldVal; - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); assert currentQueueSizeOf(oldVal) > 0; while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, currentQueueSizeOf(oldVal) - 1))) { if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = queueSize; + oldVal = getQueueSizeVolatile(); assert currentQueueSizeOf(oldVal) > 0; } }