Skip to content

Commit

Permalink
Fixes for scheduler API
Browse files Browse the repository at this point in the history
This cleans up how blocking works and fixes small parts of the
scheduler dispatch logic. Fibers start out nonblocking by default,
which means entering such a fiber puts the system in a schedulable
state (decrements root fiber's default blocking count of 1 to 0).
While schedulable, blocking calls will redispatch into the
scheduler to give it a chance to handle the blocking operation and
optionally transfer to another unblocked fiber.

The handling of blocking and unblocking will require some
additional work, since currently only a fiber yielding can unblock
its parent, but we are getting the pieces wired up.

All of CRuby's test/fiber/test_scheduler.rb passes with these
changes.
  • Loading branch information
headius committed Oct 20, 2023
1 parent ba0d4d7 commit 2da6689
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 25 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/jruby/FiberScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ public static IRubyObject close(ThreadContext context, IRubyObject scheduler) {
IRubyObject result;

result = Helpers.invokeChecked(context, scheduler, "scheduler_close");
if (result != RubyBasicObject.UNDEF) return result;
if (result != null) return result;

result = Helpers.invokeChecked(context, scheduler, "close");
if (result != RubyBasicObject.UNDEF) return result;
if (result != null) return result;

return context.nil;
}
Expand Down
21 changes: 15 additions & 6 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public enum Status {
private volatile RubyThread fiberCurrentThread;

private IRubyObject scheduler;
private boolean blocking = false;
private volatile int blockingCount = 1;

private static final AtomicIntegerFieldUpdater<RubyThread> INTERRUPT_FLAG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RubyThread.class, "interruptFlag");
Expand Down Expand Up @@ -436,6 +436,11 @@ public void dispose() {
// unlock all locked locks
unlockAll();

// close scheduler, if any
if (scheduler != null && !scheduler.isNil()) {
FiberScheduler.close(getContext(), scheduler);
}

// mark thread as DEAD
beDead();
}
Expand Down Expand Up @@ -2620,18 +2625,22 @@ public IRubyObject getScheduler() {

// MRI: rb_fiber_scheduler_current_for_threadptr, rb_fiber_scheduler_current
public IRubyObject getSchedulerCurrent() {
if (!blocking) {
if (!isBlocking()) {
return scheduler;
}

return getRuntime().getNil();
}

public boolean isBlocking() {
return blocking;
public void incrementBlocking() {
blockingCount++;
}

public void setBlocking(boolean blocking) {
this.blocking = blocking;
public void decrementBlocking() {
blockingCount--;
}

public boolean isBlocking() {
return blockingCount > 0;
}
}
82 changes: 67 additions & 15 deletions core/src/main/java/org/jruby/ext/fiber/ThreadFiber.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private static void nativeThreadLauncher(Ruby runtime, Runnable runnable) {
}

public boolean isBlocking() {
return thread.isBlocking();
return data.blocking;
}

private static class VirtualThreadLauncher implements BiConsumer<Ruby, Runnable> {
Expand Down Expand Up @@ -100,7 +100,7 @@ public static void initRootFiber(ThreadContext context, RubyThread currentThread

ThreadFiber rootFiber = new ThreadFiber(runtime, runtime.getFiber(), true);

rootFiber.data = new FiberData(new FiberQueue(runtime), currentThread, rootFiber);
rootFiber.data = new FiberData(new FiberQueue(runtime), currentThread, rootFiber, true);
rootFiber.thread = currentThread;
context.setRootFiber(rootFiber);
}
Expand All @@ -111,11 +111,11 @@ public IRubyObject initialize(ThreadContext context, Block block) {

if (!block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without block");

data = new FiberData(new FiberQueue(runtime), context.getFiberCurrentThread(), this);
data = new FiberData(new FiberQueue(runtime), context.getFiberCurrentThread(), this, false);

FiberData currentFiberData = context.getFiber().data;

thread = createThread(runtime, data, currentFiberData.queue, block, false);
thread = createThread(runtime, data, currentFiberData.queue, block);

return context.nil;
}
Expand All @@ -131,18 +131,23 @@ public IRubyObject initialize(ThreadContext context, IRubyObject _opts, Block bl
boolean blocking = false;

if (!opts.isNil()) {
IRubyObject blockingOpt = ArgsUtil.extractKeywordArg(context, opts, "blocking");
IRubyObject[] blockingPoolOpt = ArgsUtil.extractKeywordArgs(context, opts, "blocking", "pool");

if (!blockingOpt.isNil()) {
blocking = blockingOpt.isTrue();
if (blockingPoolOpt != null) {
IRubyObject blockingOpt = blockingPoolOpt[0];
if (blockingOpt != null && !blockingOpt.isNil()) {
blocking = blockingOpt.isTrue();
}

// TODO: pooling
}
}

data = new FiberData(new FiberQueue(runtime), context.getFiberCurrentThread(), this);
data = new FiberData(new FiberQueue(runtime), context.getFiberCurrentThread(), this, blocking);

FiberData currentFiberData = context.getFiber().data;

thread = createThread(runtime, data, currentFiberData.queue, block, blocking);
thread = createThread(runtime, data, currentFiberData.queue, block);

return context.nil;
}
Expand Down Expand Up @@ -185,6 +190,10 @@ public IRubyObject resume(ThreadContext context, IRubyObject[] values) {
data.prev = null;
}

if (data.blocking) {
context.getFiberCurrentThread().decrementBlocking();
}

if (result.type == RequestType.RAISE) {
throw ((RubyException) result.data).toThrowable();
}
Expand Down Expand Up @@ -225,12 +234,28 @@ private static FiberRequest exchangeWithFiber(ThreadContext context, FiberData c
// interrupted again and must abandon the fiber.

try {
adjustThreadBlocking(context, currentFiberData, targetFiberData);

return currentFiberData.queue.pop(context);
} catch (RaiseException re) {
handleExceptionDuringExchange(context, currentFiberData, targetFiberData, re);

// if we get here, we forwarded exception so try once more
return currentFiberData.queue.pop(context);
} finally {
adjustThreadBlocking(context, targetFiberData, currentFiberData);
}
}

private static void adjustThreadBlocking(ThreadContext context, FiberData currentFiberData, FiberData targetFiberData) {
// if fiber we are leaving is blocking, decrement thread blocking count
if (currentFiberData.blocking) {
context.getFiberCurrentThread().decrementBlocking();
}

// if fiber we are entering is blocking, increment thread blocking count
if (targetFiberData.blocking) {
context.getFiberCurrentThread().incrementBlocking();
}
}

Expand Down Expand Up @@ -454,7 +479,7 @@ final boolean alive() {
return true;
}

static RubyThread createThread(final Ruby runtime, final FiberData data, final FiberQueue queue, final Block block, boolean blocking) {
static RubyThread createThread(final Ruby runtime, final FiberData data, final FiberQueue queue, final Block block) {
final AtomicReference<RubyThread> fiberThread = new AtomicReference();

// retry with GC once
Expand All @@ -469,7 +494,6 @@ static RubyThread createThread(final Ruby runtime, final FiberData data, final F
RubyThread rubyThread = context.getThread();
fiberThread.set(rubyThread);
rubyThread.setFiberCurrentThread(data.parent);
rubyThread.setBlocking(blocking);

Thread thread = Thread.currentThread();
String oldName = thread.getName();
Expand Down Expand Up @@ -592,17 +616,43 @@ protected void finalize() throws Throwable {

@JRubyMethod(name = "blocking?")
public IRubyObject blocking_p(ThreadContext context) {
return RubyBoolean.newBoolean(context, context.getThread().isBlocking());
return RubyBoolean.newBoolean(context, isBlocking());
}

@JRubyMethod(name = "blocking?", meta = true)
public static IRubyObject blocking_p_s(ThreadContext context, IRubyObject self) {
boolean blocking = context.getThread().isBlocking();
boolean blocking = context.getFiber().isBlocking();
if (!blocking) return context.fals;

return RubyFixnum.one(context.runtime);
}

@JRubyMethod(name = "blocking", meta = true)
public static IRubyObject blocking(ThreadContext context, IRubyObject self, Block block) {
ThreadFiber currentFiber = context.getFiber();
boolean blocking = currentFiber.isBlocking();

// If we are already blocking, this is essentially a no-op:
if (currentFiber.isBlocking()) {
return block.yieldSpecific(context, currentFiber);
}

try {
assert !currentFiber.isBlocking() : "fiber was blocking when it should not have been";

currentFiber.data.blocking = true;

// Once the fiber is blocking, and current, we increment the thread blocking state:
context.getFiberCurrentThread().incrementBlocking();

return block.yieldSpecific(context, currentFiber);
} finally {
// We are no longer blocking:
currentFiber.data.blocking = false;
context.getFiberCurrentThread().decrementBlocking();
}
}

@JRubyMethod(name = "backtrace")
public IRubyObject backtrace(ThreadContext context) {
return backtrace(context, null, null);
Expand Down Expand Up @@ -644,7 +694,7 @@ public IRubyObject backtrace_locations(ThreadContext context, IRubyObject level,
// MRI: rb_fiber_s_schedule_kw and rb_fiber_s_schedule, kw passes on context
@JRubyMethod(name = "schedule", meta = true, rest = true, keywords = true)
public static IRubyObject schedule(ThreadContext context, IRubyObject self, IRubyObject[] args, Block block) {
RubyThread thread = context.getFiberCurrentThread();
RubyThread thread = context.getThread();
IRubyObject scheduler = thread.getScheduler();
IRubyObject fiber = context.nil;

Expand Down Expand Up @@ -684,10 +734,11 @@ public RubyThread getThread() {
}

public static class FiberData {
FiberData(FiberQueue queue, RubyThread parent, ThreadFiber fiber) {
FiberData(FiberQueue queue, RubyThread parent, ThreadFiber fiber, boolean blocking) {
this.queue = queue;
this.parent = parent;
this.fiber = new WeakReference<ThreadFiber>(fiber);
this.blocking = blocking;
}

public ThreadFiber getPrev() {
Expand All @@ -699,6 +750,7 @@ public ThreadFiber getPrev() {
final RubyThread parent;
final WeakReference<ThreadFiber> fiber;
volatile boolean transferred;
volatile boolean blocking;
}

volatile FiberData data;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD f
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count);

if (result != RubyBasicObject.UNDEF) {
if (result != null) {
FiberScheduler.resultApply(context, result);
}
}
Expand Down Expand Up @@ -2455,7 +2455,7 @@ public static int writeInternal(ThreadContext context, OpenFile fptr, ByteBuffer
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count);

if (result != RubyBasicObject.UNDEF) {
if (result != null) {
FiberScheduler.resultApply(context, result);
}
}
Expand Down

0 comments on commit 2da6689

Please sign in to comment.