Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout parameter to wait(::Condition) #56974

Merged
merged 7 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 87 additions & 3 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,104 @@ proceeding.
"""
function wait end

# wait with timeout
#
# The behavior of wait changes if a timeout is specified. There are
# three concurrent entities that can interact:
# 1. Task W: the task that calls wait w/timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait(c::GenericCondition; first::Bool=false)
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)

Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.

If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.

If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait(c::GenericCondition; first::Bool=false)
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))

ct = current_task()
_wait2(c, ct, first)
token = unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
Comment on lines +203 to +205
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to introduce a data race though, so we cannot merge this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's that? We're locking the condition variable here.

Copy link
Member

@vtjnash vtjnash Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Timer runs concurrently with the return from wait, so by the time this code runs, you might have just corrupted some arbitrary subsequent wait on the same condition or by the time you schedule the TimeoutError, it could blow up some completely unrelated wait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay. There's an ABA problem. Let me see if I can find a solution for that.

But the waiting task is only scheduled with a TimeoutError if it was in this condition's wait queue, so I'm not sure I understand your "or" case here -- the only subsequent wait that could get blown up is a wait on the same condition, which is the same ABA problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could been in the waitq, then removed before you got around to scheduling it, or vice versa with some other thread scheduling before it got around to removing it from the queue. Those codes are running on other threads, so it could be concurrent. There is potentially no guarantee that you can safely mutate this data-structure concurrently on two threads (#55542)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a fix for the ABA problem that relies on happens-before -- if the waiter was scheduled, it sets waiter_left before returning. It can only re-enter the condition's wait queue by another call to wait, for which it must acquire the lock.

We acquire the condition's lock before checking waiter_left and for the task's presence in the wait queue. If the task is present, it can only be because it has not been scheduled, because if it was scheduled, it would have set waiter_left before re-entering the wait queue.

I think the combination of the lock and the atomic assure there is no ABA problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could been in the waitq, then removed before you got around to scheduling it

We acquire the lock, confirm that the waiter did not leave and remove it from the wait queue before scheduling it. If it was not in the wait queue, we do not schedule it and this decision is made while holding the lock.

some other thread scheduling before it got around to removing it from the queue

If the task is scheduled by notify, then it is removed from the condition's wait queue before it is scheduled, which is done while holding the condition's lock. If it is not in the wait queue, then we do not schedule it.

if !waiter_left[] && ct.queue == c.waitq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to note the orderings here in a little diagram?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment describing the typical flows and some possible interleavings.

dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about throwing an instance of a custom struct here instead?

struct TimeOutEvent end

and returning

            dosched && schedule(ct, TimeOutEvent())

With the current code, i worry about someone having a typo and missing the timeout for that reason :(

    if wait(cond) === :time_out  # oops, this will never be reached
         # handle timeout
    end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, wow, i just read your commit comment:

Return :timed_out instead of :timeout like Base.timedwait

.... :/ I don't love that decision for base..... But i also don't love the idea of doing something different than timedwait....
I think my preference is still to use a custom struct here, but i feel a bit less confident about it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the special symbol either, but I didn't want to have two different timed waits in Base with different interfaces.

Would be good to get some more reviews on this PR.

end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we have no threads in the interactive threadpool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threads._spawn_set_thrpool will set the threadpool to :default if there are no interactive threads.

schedule(t)
end

try
return wait()
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
NHDaly marked this conversation as resolved.
Show resolved Hide resolved
catch
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
Expand Down
17 changes: 17 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Random
using Base.Threads
using Base: Experimental
using Base: n_avail

Expand Down Expand Up @@ -39,6 +40,22 @@ end
@test fetch(t) == "finished"
end

@testset "timed wait on Condition" begin
a = Threads.Condition()
@test_throws ArgumentError @lock a wait(a; timeout=0.0005)
@test @lock a wait(a; timeout=0.1)==:timed_out
lock(a)
@spawn begin
@lock a notify(a)
end
@test try
wait(a; timeout=2)
true
finally
unlock(a)
end
end

@testset "various constructors" begin
c = Channel()
@test eltype(c) == Any
Expand Down
Loading