Skip to content

Commit

Permalink
Update test_scheduler
Browse files Browse the repository at this point in the history
It still all passes 🎉
  • Loading branch information
headius committed Oct 20, 2023
1 parent 2da6689 commit 5fa2ff7
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 58 deletions.
3 changes: 3 additions & 0 deletions test/mri/fiber/autoload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sleep 0.01
module TestFiberSchedulerAutoload
end
225 changes: 167 additions & 58 deletions test/mri/fiber/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# frozen_string_literal: true

# This is an example and simplified scheduler for test purposes.
# It is not efficient for a large number of file descriptors as it uses IO.select().
# Production Fiber schedulers should use epoll/kqueue/etc.
# - It is not efficient for a large number of file descriptors as it uses
# IO.select().
# - It does not correctly handle multiple calls to `wait` with the same file
# descriptor and overlapping events.
# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the
# [`io-event`](https://github.com/socketry/io-event) gem instead of this
# scheduler if you want something simple to build on.

require 'fiber'
require 'socket'
Expand Down Expand Up @@ -30,7 +35,7 @@ def initialize
@closed = false

@lock = Thread::Mutex.new
@blocking = 0
@blocking = Hash.new.compare_by_identity
@ready = []

@urgent = IO.pipe
Expand All @@ -57,8 +62,8 @@ def next_timeout
def run
# $stderr.puts [__method__, Fiber.current].inspect

while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive?
# Can only handle file descriptors up to 1024...
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
# May only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)

# puts "readable: #{readable}" if readable&.any?
Expand Down Expand Up @@ -115,10 +120,16 @@ def run
end
end

# A fiber scheduler hook, invoked when the scheduler goes out of scope.
def scheduler_close
close(true)
end

# If the `scheduler_close` hook does not exist, this method `close` will be
# invoked instead when the fiber scheduler goes out of scope. This is legacy
# behaviour, you should almost certainly use `scheduler_close`. The reason for
# this, is `scheduler_close` is called when the scheduler goes out of scope,
# while `close` may be called by the user.
def close(internal = false)
# $stderr.puts [__method__, Fiber.current].inspect

Expand Down Expand Up @@ -153,6 +164,7 @@ def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

# This hook is invoked by `Timeout.timeout` and related code.
def timeout_after(duration, klass, message, &block)
fiber = Fiber.current

Expand All @@ -171,6 +183,7 @@ def timeout_after(duration, klass, message, &block)
end
end

# This hook is invoked by `Process.wait`, `system`, and backticks.
def process_wait(pid, flags)
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect

Expand All @@ -180,24 +193,47 @@ def process_wait(pid, flags)
end.value
end

# This hook is invoked by `IO#read` and `IO#write` in the case that `io_read`
# and `io_write` hooks are not available. This implementation is not
# completely general, in the sense that calling `io_wait` multiple times with
# the same `io` and `events` will not work, which is okay for tests but not
# for real code. Correct fiber schedulers should not have this limitation.
def io_wait(io, events, duration)
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect

fiber = Fiber.current

unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
@readable[io] = fiber
readable = true
end

unless (events & IO::WRITABLE).zero?
@writable[io] = Fiber.current
@writable[io] = fiber
writable = true
end

if duration
@waiting[fiber] = current_time + duration
end

Fiber.yield
ensure
@readable.delete(io)
@writable.delete(io)
@waiting.delete(fiber) if duration
@readable.delete(io) if readable
@writable.delete(io) if writable
end

# This hook is invoked by `IO.select`. Using a thread ensures that the
# operation does not block the fiber scheduler.
def io_select(...)
# Emulate the operation using a non-blocking thread:
Thread.new do
IO.select(...)
end.value
end

# Used for Kernel#sleep and Thread::Mutex#sleep
# This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`.
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect

Expand All @@ -206,32 +242,35 @@ def kernel_sleep(duration = nil)
return true
end

# Used when blocking on synchronization (Thread::Mutex#lock,
# Thread::Queue#pop, Thread::SizedQueue#push, ...)
# This hook is invoked by blocking options such as `Thread::Mutex#lock`,
# `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by
# other threads/fibers. To unblock a blocked fiber, you should call `unblock`
# with the same `blocker` and `fiber` arguments.
def block(blocker, timeout = nil)
# $stderr.puts [__method__, blocker, timeout].inspect

fiber = Fiber.current

if timeout
@waiting[Fiber.current] = current_time + timeout
@waiting[fiber] = current_time + timeout
begin
Fiber.yield
ensure
# Remove from @waiting in the case #unblock was called before the timeout expired:
@waiting.delete(Fiber.current)
@waiting.delete(fiber)
end
else
@blocking += 1
@blocking[fiber] = true
begin
Fiber.yield
ensure
@blocking -= 1
@blocking.delete(fiber)
end
end
end

# Used when synchronization wakes up a previously-blocked fiber
# (Thread::Mutex#unlock, Thread::Queue#push, ...).
# This might be called from another thread.
# This method is invoked from a thread or fiber to unblock a fiber that is
# blocked by `block`. It is expected to be thread safe.
def unblock(blocker, fiber)
# $stderr.puts [__method__, blocker, fiber].inspect
# $stderr.puts blocker.backtrace.inspect
Expand All @@ -245,6 +284,9 @@ def unblock(blocker, fiber)
io.write_nonblock('.')
end

# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
# it to create scheduled fibers, but it is not required in practice;
# `Fiber.new` is usually sufficient.
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)

Expand All @@ -253,92 +295,143 @@ def fiber(&block)
return fiber
end

# This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that
# the operation does not block the fiber scheduler, since `getaddrinfo` is
# usually provided by `libc` and is blocking.
def address_resolve(hostname)
Thread.new do
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end
end

# This scheduler class implements `io_read` and `io_write` hooks which require
# `IO::Buffer`.
class IOBufferScheduler < Scheduler
EAGAIN = Errno::EAGAIN::Errno
EAGAIN = -Errno::EAGAIN::Errno

def io_read(io, buffer, length)
offset = 0
def io_read(io, buffer, length, offset)
total = 0
io.nonblock = true

while true
maximum_size = buffer.size - offset
result = blocking{io.read_nonblock(maximum_size, exception: false)}

# blocking{pp read: maximum_size, result: result, length: length}
result = blocking{buffer.read(io, maximum_size, offset)}

case result
when :wait_readable
if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return -EAGAIN
return result
end
when :wait_writable
elsif result < 0
return result
end
end

return total
end

def io_write(io, buffer, length, offset)
total = 0
io.nonblock = true

while true
maximum_size = buffer.size - offset
result = blocking{buffer.write(io, maximum_size, offset)}

if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return -EAGAIN
return result
end
else
break unless result

buffer.set_string(result, offset)

size = result.bytesize
offset += size
break if size >= length
length -= size
elsif result < 0
return result
end
end

return offset
return total
end

def io_write(io, buffer, length)
offset = 0
def io_pread(io, buffer, from, length, offset)
total = 0
io.nonblock = true

while true
maximum_size = buffer.size - offset
result = blocking{buffer.pread(io, from, maximum_size, offset)}

chunk = buffer.get_string(offset, maximum_size)
result = blocking{io.write_nonblock(chunk, exception: false)}

# blocking{pp write: maximum_size, result: result, length: length}

case result
when :wait_readable
if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return -EAGAIN
return result
end
when :wait_writable
elsif result < 0
return result
end
end

return total
end

def io_pwrite(io, buffer, from, length, offset)
total = 0
io.nonblock = true

while true
maximum_size = buffer.size - offset
result = blocking{buffer.pwrite(io, from, maximum_size, offset)}

if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return -EAGAIN
return result
end
else
offset += result
break if result >= length
length -= result
elsif result < 0
return result
end
end

return offset
return total
end

def blocking(&block)
Fiber.new(blocking: true, &block).resume
Fiber.blocking(&block)
end
end

# This scheduler has a broken implementation of `unblock`` in the sense that it
# raises an exception. This is used to test the behavior of the scheduler when
# unblock raises an exception.
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
Expand All @@ -347,6 +440,9 @@ def unblock(blocker, fiber)
end
end

# This scheduler has a broken implementation of `unblock` in the sense that it
# sleeps. This is used to test the behavior of the scheduler when unblock
# messes with the internal thread state in an unexpected way.
class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
Expand All @@ -356,3 +452,16 @@ def unblock(blocker, fiber)
sleep(0.1)
end
end

# This scheduler has a broken implementation of `kernel_sleep` in the sense that
# it invokes a blocking sleep which can cause a deadlock in some cases.
class SleepingBlockingScheduler < Scheduler
def kernel_sleep(duration = nil)
# Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
Fiber.blocking{sleep 0.0001}

self.block(:sleep, duration)

return true
end
end
Loading

0 comments on commit 5fa2ff7

Please sign in to comment.