Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Threads.@spawn instead of @async #86

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Use Threads.@spawn instead of @async
devmotion authored Oct 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit fe8b1a8b45b54ce834637f9fc1303eaaf1458b02
4 changes: 2 additions & 2 deletions src/DistributedStdlibWorker.jl
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ mutable struct DistributedStdlibWorker <: AbstractWorker
# TODO: process preamble from Pluto?

# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
w = finalizer(w -> Threads.@spawn(stop(w)),
new(pid, true)
)
atexit(() -> stop(w))
@@ -52,7 +52,7 @@ macro transform_exception(worker, ex)
end

function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...)
@async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
Threads.@spawn Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
end

function remote_call_fetch(f, w::DistributedStdlibWorker, args...; kwargs...)
18 changes: 9 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ mutable struct Worker <: AbstractWorker


# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
w = finalizer(w -> Threads.@spawn(stop(w)),
new(
port,
proc,
@@ -147,7 +147,7 @@ Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with


function _exit_loop(worker::Worker)
@async for _i in Iterators.countfrom(1)
Threads.@spawn for _i in Iterators.countfrom(1)
try
if !isrunning(worker)
# the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply.
@@ -172,7 +172,7 @@ function _receive_loop(worker::Worker)
# instead of
# `while true`
# as a workaround for https://github.com/JuliaLang/julia/issues/37154
@async for _i in Iterators.countfrom(1)
Threads.@spawn for _i in Iterators.countfrom(1)
try
if !isopen(io)
@debug("HOST: io closed.")
@@ -283,7 +283,7 @@ _new_do_msg(f::Function, args, kwargs) = (
# # TODO: `while` instead of `if`?
# if w.current_socket === nothing || !isopen(w.current_socket)
# w.current_socket = connect(w.port)
# @async _receive_loop(w)
# Threads.@spawn _receive_loop(w)
# end
# return w
# end
@@ -337,12 +337,12 @@ function _send_receive(w::Worker, msg_type::UInt8, msg_data)
end

"""
`@async(_wait_for_response) ∘ _send_msg`
`Threads.@spawn(_wait_for_response) ∘ _send_msg`
"""
function _send_receive_async(w::Worker, msg_type::UInt8, msg_data, output_transformation=identity)::Task
# TODO: Unwrap TaskFailedExceptions
msg_id = _send_msg(w, msg_type, msg_data, true)
return @async output_transformation(_wait_for_response(w, msg_id))
return Threads.@spawn output_transformation(_wait_for_response(w, msg_id))
end


@@ -375,7 +375,7 @@ function remote_call(f, w::Worker, args...; kwargs...)
)
end
function remote_call(f, w::InProcessWorker, args...; kwargs...)
w.latest_request_task = @async remote_call_fetch(f, w, args...; kwargs...)
w.latest_request_task = Threads.@spawn remote_call_fetch(f, w, args...; kwargs...)
end
function remote_call_fetch(f, w::InProcessWorker, args...; kwargs...)
try
@@ -446,7 +446,7 @@ function remote_do(f, w::Worker, args...; kwargs...)
nothing
end
function remote_do(f, ::InProcessWorker, args...; kwargs...)
@async f(args...; kwargs...)
Threads.@spawn f(args...; kwargs...)
nothing
end

@@ -645,7 +645,7 @@ function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false)
Base.active_repl_backend.in_eval

@debug "HOST: Rethrowing interrupt to REPL"
@async Base.schedule(Base.active_repl_backend.backend_task, e; error=true)
Threads.@spawn Base.schedule(Base.active_repl_backend.backend_task, e; error=true)
elseif rethrow_regular
@debug "HOST: Don't know what to do with this interrupt, rethrowing" exception = (e, catch_backtrace())
rethrow(e)
4 changes: 2 additions & 2 deletions src/worker.jl
Original file line number Diff line number Diff line change
@@ -115,7 +115,7 @@ interrupt(::Nothing) = nothing
function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, respond_with_nothing = msg

@async begin
Threads.@spawn begin
result, success = try
result = f(args...; kwargs...)

@@ -139,7 +139,7 @@ end
function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, _ignored = msg

@async try
Threads.@spawn try
f(args...; kwargs...)
catch e
@warn("WORKER: Got exception while running call without response", exception=(e, catch_backtrace()))
2 changes: 1 addition & 1 deletion test/basic.jl
Original file line number Diff line number Diff line change
@@ -128,7 +128,7 @@



t = @async begin
t = Threads.@spawn begin
for i in 1:2*channel_size
@test take!(lc) == i
end