diff --git a/base/condition.jl b/base/condition.jl index fd771c9be346a..90c53b7ad310d 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -125,20 +125,104 @@ proceeding. """ function wait end +# wait with timeout +# +# The behavior of wait changes if a timeout is specified. There are +# three concurrent entities that can interact: +# 1. Task W: the task that calls wait w/timeout. +# 2. Task T: the task created to handle a timeout. +# 3. Task N: the task that notifies the Condition being waited on. +# +# Typical flow: +# - W enters the Condition's wait queue. +# - W creates T and stops running (calls wait()). +# - T, when scheduled, waits on a Timer. +# - Two common outcomes: +# - N notifies the Condition. +# - W starts running, closes the Timer, sets waiter_left and returns +# the notify'ed value. +# - The closed Timer throws an EOFError to T which simply ends. +# - The Timer expires. +# - T starts running and locks the Condition. +# - T confirms that waiter_left is unset and that W is still in the +# Condition's wait queue; it then removes W from the wait queue, +# sets dosched to true and unlocks the Condition. +# - If dosched is true, T schedules W with the special :timed_out +# value. +# - T ends. +# - W runs and returns :timed_out. +# +# Some possible interleavings: +# - N notifies the Condition but the Timer expires and T starts running +# before W: +# - W closing the expired Timer is benign. +# - T will find that W is no longer in the Condition's wait queue +# (which is protected by a lock) and will not schedule W. +# - N notifies the Condition; W runs and calls wait on the Condition +# again before the Timer expires: +# - W sets waiter_left before leaving. When T runs, it will find that +# waiter_left is set and will not schedule W. +# +# The lock on the Condition's wait queue and waiter_left together +# ensure proper synchronization and behavior of the tasks involved. + """ - wait(c::GenericCondition; first::Bool=false) + wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. If the keyword `first` is set to `true`, the waiter will be put _first_ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. + +If `timeout` is specified, cancel the `wait` when it expires and return +`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1 +millisecond. """ -function wait(c::GenericCondition; first::Bool=false) +function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond")) + ct = current_task() _wait2(c, ct, first) token = unlockall(c.lock) + + timer::Union{Timer, Nothing} = nothing + waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing + if timeout > 0.0 + timer = Timer(timeout) + waiter_left = Threads.Atomic{Bool}(false) + # start a task to wait on the timer + t = Task() do + try + wait(timer) + catch e + # if the timer was closed, the waiting task has been scheduled; do nothing + e isa EOFError && return + end + dosched = false + lock(c.lock) + # Confirm that the waiting task is still in the wait queue and remove it. If + # the task is not in the wait queue, it must have been notified already so we + # don't do anything here. + if !waiter_left[] && ct.queue == c.waitq + dosched = true + Base.list_deletefirst!(c.waitq, ct) + end + unlock(c.lock) + # send the waiting task a timeout + dosched && schedule(ct, :timed_out) + end + t.sticky = false + Threads._spawn_set_thrpool(t, :interactive) + schedule(t) + end + try - return wait() + res = wait() + if timer !== nothing + close(timer) + waiter_left[] = true + end + return res catch q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() diff --git a/test/channels.jl b/test/channels.jl index 4acf6c94da1b6..6e74a2079234c 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -1,6 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using Random +using Base.Threads using Base: Experimental using Base: n_avail @@ -39,6 +40,22 @@ end @test fetch(t) == "finished" end +@testset "timed wait on Condition" begin + a = Threads.Condition() + @test_throws ArgumentError @lock a wait(a; timeout=0.0005) + @test @lock a wait(a; timeout=0.1)==:timed_out + lock(a) + @spawn begin + @lock a notify(a) + end + @test try + wait(a; timeout=2) + true + finally + unlock(a) + end +end + @testset "various constructors" begin c = Channel() @test eltype(c) == Any