Skip to content

Commit

Permalink
Allow transient tasks to exit gracefully. (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Aug 7, 2024
1 parent 0bcaeba commit 77059f1
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 41 deletions.
26 changes: 26 additions & 0 deletions lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ def nil?
empty?
end

# Adjust the number of transient children, assuming it has changed.
#
# Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}.
#
# @parameter transient [Boolean] Whether to increment or decrement the transient count.
def adjust_transient_count(transient)
if transient
@transient_count += 1
else
@transient_count -= 1
end
end

private

def added(node)
Expand Down Expand Up @@ -110,6 +123,19 @@ def transient?
@transient
end

# Change the transient state of the node.
#
# A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed.
#
# @parameter value [Boolean] Whether the node is transient.
def transient=(value)
if @transient != value
@transient = value

@parent&.children&.adjust_transient_count(value)
end
end

# Annotate the node with a description.
#
# @parameter annotation [String] The description to annotate the node with.
Expand Down
73 changes: 39 additions & 34 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,33 @@ def load
# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
def scheduler_close
def scheduler_close(error = $!)
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
unless error
self.run
end
ensure
self.close
end

private def shutdown!
# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
self.stop

self.run_loop do
unless @children.nil?
run_once!
end
# Terminate all child tasks.
def terminate
# If that doesn't work, take more serious action:
@children&.each do |child|
child.terminate
end

return @children.nil?
end

# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
self.shutdown!
self.run_loop do
until self.terminate
self.run_once!
end
end

Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
ensure
Expand Down Expand Up @@ -288,21 +291,6 @@ def process_wait(pid, flags)
return @selector.process_wait(Fiber.current, pid, flags)
end

# Run one iteration of the event loop.
# Does not handle interrupts.
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?

# If we are finished, we stop the task tree and exit:
if self.finished?
return false
end

return run_once!(timeout)
end

# Run one iteration of the event loop.
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
Expand Down Expand Up @@ -346,6 +334,25 @@ def run_once(timeout = nil)
return true
end

# Run one iteration of the event loop.
# Does not handle interrupts.
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?

if self.finished?
self.stop
end

# If we are finished, we stop the task tree and exit:
if @children.nil?
return false
end

return run_once!(timeout)
end

# Checks and clears the interrupted state of the scheduler.
# @returns [Boolean] Whether the reactor has been interrupted.
private def interrupted?
Expand All @@ -363,10 +370,8 @@ def run_once(timeout = nil)

# Stop all children, including transient children, ignoring any signals.
def stop
Thread.handle_interrupt(::SignalException => :never) do
@children&.each do |child|
child.stop
end
@children&.each do |child|
child.stop
end
end

Expand All @@ -382,7 +387,9 @@ def stop
end
end
rescue Interrupt => interrupt
self.stop
Thread.handle_interrupt(::SignalException => :never) do
self.stop
end

retry
end
Expand All @@ -398,9 +405,7 @@ def run(...)
initial_task = self.async(...) if block_given?

self.run_loop do
unless self.finished?
run_once!
end
run_once
end

return initial_task
Expand Down
7 changes: 5 additions & 2 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def completed?

alias complete? completed?

# @attribute [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`.
# @attribute [Symbol] The status of the execution of the task, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`.
attr :status

# Begin the execution of the task.
Expand Down Expand Up @@ -262,6 +262,9 @@ def stop(later = false)

# If the fiber is alive, we need to stop it:
if @fiber&.alive?
# As the task is now exiting, we want to ensure the event loop continues to execute until the task finishes.
self.transient = false

if self.current?
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
if later
Expand All @@ -276,7 +279,7 @@ def stop(later = false)
begin
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
Fiber.scheduler.raise(@fiber, Stop)
rescue FiberError
rescue FiberError => error
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
Fiber.scheduler.push(Stop::Later.new(self))
end
Expand Down
18 changes: 18 additions & 0 deletions test/async/children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,22 @@
expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
end
end

with "transient children" do
let(:parent) {Async::Node.new}
let(:children) {parent.children}

it "can add a transient child" do
child = Async::Node.new(parent, transient: true)
expect(children).to be(:transients?)

child.transient = false
expect(children).not.to be(:transients?)
expect(parent).not.to be(:finished?)

child.transient = true
expect(children).to be(:transients?)
expect(parent).to be(:finished?)
end
end
end
5 changes: 1 addition & 4 deletions test/async/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,8 @@
sleep
end

expect(reactor.run_once).to be == false
expect(reactor).to be(:finished?)

# Kick the task into the ensure block:
reactor.stop
expect(reactor.run_once(0)).to be == true

reactor.close
end
Expand Down
4 changes: 3 additions & 1 deletion test/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@
with "transient tasks" do
it "exits gracefully" do
state = nil
child_task = nil

Sync do |task|
task.async(transient: true) do
child_task = task.async(transient: true) do
state = :sleeping
# Never come back:
Fiber.scheduler.transfer
Expand All @@ -205,6 +206,7 @@
end

expect(state).to be == :finished
expect(child_task).not.to be(:transient?)
end
end
end

0 comments on commit 77059f1

Please sign in to comment.