Skip to content

Commit

Permalink
Move queue size to unshared longs
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed May 2, 2024
1 parent c127c5a commit 9389f88
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
* <li>Bit 63: 1 = shutdown complete; 0 = shutdown not complete</li>
* </ul>
* </li>
* <li>{@code 1} {@code queueSize}: Information about the current queue size:
* <ul>
* <li>Bit 00..1F: current queue length</li>
* <li>Bit 20..3F: queue limit</li>
* </ul>
* </li>
* </ul>
*/
final long[] unsharedLongs = new long[RuntimeFields.unsharedLongsSize];
Expand All @@ -243,16 +249,6 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
@SuppressWarnings("unused") // used by field updater
volatile Waiter terminationWaiters;

/**
* Queue size:
* <ul>
* <li>Bit 00..1F: current queue length</li>
* <li>Bit 20..3F: queue limit</li>
* </ul>
*/
@SuppressWarnings("unused") // used by field updater
volatile long queueSize;

/**
* The thread keep-alive timeout value.
*/
Expand Down Expand Up @@ -311,13 +307,11 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
// Updaters
// =======================================================

private static final int numUnsharedLongs = 1;
private static final int numUnsharedLongs = 2;
private static final int numUnsharedObjects = 2;

private static final long terminationWaitersOffset;

private static final long queueSizeOffset;

private static final long peakThreadCountOffset;
private static final long activeCountOffset;
private static final long peakQueueSizeOffset;
Expand All @@ -329,7 +323,9 @@ private static final class RuntimeFields {

private static final long headOffset;
private static final long tailOffset;

private static final long threadStatusOffset;
private static final long queueSizeOffset;

static {
int cacheLine = CacheInfo.getSmallestDataCacheLineSize();
Expand All @@ -346,15 +342,14 @@ private static final class RuntimeFields {
headOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine;
tailOffset = unsafe.arrayBaseOffset(Object[].class) + cacheLine * 2;
threadStatusOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine;
queueSizeOffset = unsafe.arrayBaseOffset(long[].class) + cacheLine * 2;
}
}

static {
try {
terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters"));

queueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("queueSize"));

peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount"));
activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount"));
peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize"));
Expand Down Expand Up @@ -428,7 +423,7 @@ private static final class RuntimeFields {
// thread stat
setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize));
timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize());
setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()));
mxBean = new MXBeanImpl();
if (! DISABLE_MBEAN && builder.isRegisterMBean()) {
this.acc = getContext();
Expand Down Expand Up @@ -1358,7 +1353,7 @@ public void setKeepAliveTime(final Duration keepAliveTime) {
* @see Builder#getMaximumQueueSize() Builder.getMaximumQueueSize()
*/
public int getMaximumQueueSize() {
return maxQueueSizeOf(queueSize);
return maxQueueSizeOf(getQueueSizeVolatile());
}

/**
Expand All @@ -1374,7 +1369,7 @@ public void setMaximumQueueSize(final int maxQueueSize) {
if (NO_QUEUE_LIMIT) return;
long oldVal;
do {
oldVal = queueSize;
oldVal = getQueueSizeVolatile();
} while (! compareAndSetQueueSize(oldVal, withMaxQueueSize(oldVal, maxQueueSize)));
}

Expand Down Expand Up @@ -1435,7 +1430,7 @@ public void setTerminationTask(final Runnable terminationTask) {
* @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled
*/
public int getQueueSize() {
return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(queueSize);
return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(getQueueSizeVolatile());
}

/**
Expand Down Expand Up @@ -2045,8 +2040,17 @@ boolean compareAndSetPeakQueueSize(final int expect, final int update) {
return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update);
}

long getQueueSizeVolatile() {
return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.queueSizeOffset);
}

long setQueueSizePlain(long queueSize) {
unsafe.putLong(unsharedLongs, RuntimeFields.queueSizeOffset, queueSize);
return queueSize;
}

boolean compareAndSetQueueSize(final long expect, final long update) {
return unsafe.compareAndSwapLong(this, queueSizeOffset, expect, update);
return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.queueSizeOffset, expect, update);
}

boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) {
Expand All @@ -2062,7 +2066,7 @@ Waiter getAndSetTerminationWaiters(final Waiter update) {
// =======================================================

boolean increaseQueueSize() {
long oldVal = queueSize;
long oldVal = getQueueSizeVolatile();
int oldSize = currentQueueSizeOf(oldVal);
if (oldSize >= maxQueueSizeOf(oldVal)) {
// reject
Expand All @@ -2071,7 +2075,7 @@ boolean increaseQueueSize() {
int newSize = oldSize + 1;
while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, newSize))) {
if (UPDATE_STATISTICS) spinMisses.increment();
oldVal = queueSize;
oldVal = getQueueSizeVolatile();
oldSize = currentQueueSizeOf(oldVal);
if (oldSize >= maxQueueSizeOf(oldVal)) {
// reject
Expand All @@ -2091,11 +2095,11 @@ boolean increaseQueueSize() {

void decreaseQueueSize() {
long oldVal;
oldVal = queueSize;
oldVal = getQueueSizeVolatile();
assert currentQueueSizeOf(oldVal) > 0;
while (! compareAndSetQueueSize(oldVal, withCurrentQueueSize(oldVal, currentQueueSizeOf(oldVal) - 1))) {
if (UPDATE_STATISTICS) spinMisses.increment();
oldVal = queueSize;
oldVal = getQueueSizeVolatile();
assert currentQueueSizeOf(oldVal) > 0;
}
}
Expand Down

0 comments on commit 9389f88

Please sign in to comment.