Skip to content

Commit

Permalink
Work pool for (more) efficient handling of blocking_operation_wait.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Nov 23, 2024
1 parent 34464e5 commit d1fcdd4
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 10 deletions.
22 changes: 12 additions & 10 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

require_relative "clock"
require_relative "task"
require_relative "worker_pool"

require "io/event"

Expand Down Expand Up @@ -49,6 +50,7 @@ def initialize(parent = nil, selector: nil)
@idle_time = 0.0

@timers = ::IO::Event::Timers.new
@worker_pool = WorkerPool.new
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
Expand Down Expand Up @@ -112,6 +114,11 @@ def close

selector&.close

worker_pool = @worker_pool
@worker_pool = nil

worker_pool&.close

consume
end

Expand Down Expand Up @@ -169,8 +176,11 @@ def resume(fiber, *arguments)

# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
#

# @public Since *Async v2*.
# @asynchronous May only be called on same thread as fiber scheduler.
#
# @parameter blocker [Object] The object that is blocking the fiber.
# @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely.
def block(blocker, timeout)
# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
fiber = Fiber.current
Expand Down Expand Up @@ -346,15 +356,7 @@ def process_wait(pid, flags)
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
thread = Thread.new(&work)

result = thread.join

thread = nil

return result
ensure
thread&.kill
@worker_pool.call(work)
end

# Run one iteration of the event loop.
Expand Down
150 changes: 150 additions & 0 deletions lib/async/worker_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "etc"

module Async
# A simple work pool that offloads work to a background thread.
#
# @private
class WorkerPool
class Promise
def initialize(work)
@work = work
@state = :pending
@value = nil
@guard = ::Mutex.new
@condition = ::ConditionVariable.new
@thread = nil
end

def call
work = nil

@guard.synchronize do
@thread = ::Thread.current

return unless work = @work
end

resolve(work.call)
rescue Exception => error
reject(error)
end

private def resolve(value)
@guard.synchronize do
@work = nil
@thread = nil
@value = value
@state = :resolved
@condition.broadcast
end
end

private def reject(error)
@guard.synchronize do
@work = nil
@thread = nil
@value = error
@state = :failed
@condition.broadcast
end
end

def cancel
@guard.synchronize do
@work = nil
@state = :cancelled
@thread&.raise(Interrupt)
end
end

def wait
@guard.synchronize do
while @state == :pending
@condition.wait(@guard)
end

if @state == :failed
raise @value
else
return @value
end
end
end
end

# A handle to the work being done.
class Worker
def initialize
@work = ::Thread::Queue.new
@thread = ::Thread.new(&method(:run))
end

def run
while work = @work.pop
work.call
end
end

def close
if thread = @thread
@thread = nil
thread.kill
end
end

# Call the work and notify the scheduler when it is done.
def call(work)
promise = Promise.new(work)

@work.push(promise)

promise.wait
end
end

# Create a new work pool.
#
# @parameter size [Integer] The number of threads to use.
def initialize(size: Etc.nprocessors)
@ready = ::Thread::Queue.new

size.times do
@ready.push(Worker.new)
end
end

# Close the work pool. Kills all outstanding work.
def close
if ready = @ready
@ready = nil
ready.close

while worker = ready.pop
worker.close
end
end
end

# Offload work to a thread.
#
# @parameter work [Proc] The work to be done.
def call(work)
if ready = @ready
worker = ready.pop

begin
worker.call(work)
ensure
ready.push(worker)
end
else
raise RuntimeError, "No worker available!"
end
end
end
end
40 changes: 40 additions & 0 deletions test/async/worker_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2022-2024, by Samuel Williams.
# Copyright, 2024, by Patrik Wenger.

require "async/worker_pool"
require "sus/fixtures/async"

describe Async::WorkerPool do
include Sus::Fixtures::Async::ReactorContext

let(:worker_pool) {subject.new(size: 1)}

it "offloads work to a thread" do
result = worker_pool.call(proc do
Thread.current
end)

expect(result).not.to be == Thread.current
end

it "gracefully handles errors" do
expect do
worker_pool.call(proc do
raise ArgumentError, "Oops!"
end)
end.to raise_exception(ArgumentError, message: be == "Oops!")
end

with "#close" do
it "can be closed" do
worker_pool.close

expect do
worker_pool.call(proc{})
end.to raise_exception(RuntimeError)
end
end
end

0 comments on commit d1fcdd4

Please sign in to comment.