From def0188d211d1ebdbb466e221a58fc8541935ede Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Wed, 27 Sep 2023 12:32:11 +0200 Subject: [PATCH 1/2] Avoid infinite wait when process interrupted --- src/Malt.jl | 19 ++++++++++++++++++- src/shared.jl | 1 + 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/Malt.jl b/src/Malt.jl index 57dd56b..84bfce6 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -44,6 +44,8 @@ end function unwrap_worker_result(worker::AbstractWorker, result::WorkerResult) if result.msg_type == MsgType.special_serialization_failure throw(ErrorException("Error deserializing data from $(summary(worker)):\n\n$(sprint(Base.showerror, result.value))")) + elseif result.msg_type == MsgType.special_worker_terminated + throw(TerminatedWorkerException()) elseif result.msg_type == MsgType.from_worker_call_failure throw(RemoteException(worker, result.value)) else @@ -132,12 +134,27 @@ Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port)") function _receive_loop(worker::Worker) io = worker.current_socket + + exit_handler_task = @async for _i in Iterators.countfrom(1) + try + if !isrunning(worker) + for c in values(worker.expected_replies) + isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing)) + end + break + end + sleep(1) + catch e + @error "asdfdfs" exception=(e,catch_backtrace()) + end + end + # Here we use: # `for _i in Iterators.countfrom(1)` # instead of # `while true` # as a workaround for https://github.com/JuliaLang/julia/issues/37154 - @async for _i in Iterators.countfrom(1) + listen_task = @async for _i in Iterators.countfrom(1) try if !isopen(io) @debug("HOST: io closed.") diff --git a/src/shared.jl b/src/shared.jl index 29d55f4..2f9c0a4 100644 --- a/src/shared.jl +++ b/src/shared.jl @@ -8,6 +8,7 @@ const MsgType = ( from_worker_call_failure = UInt8(81), ### special_serialization_failure = UInt8(100), + special_worker_terminated = UInt8(101), ) const MsgID = UInt64 From 0832fc3d7336e7c54ab7617621bf14286c48e34d Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Wed, 27 Sep 2023 12:47:21 +0200 Subject: [PATCH 2/2] tweakz --- src/Malt.jl | 18 ++++++++++++------ test/basic.jl | 22 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Malt.jl b/src/Malt.jl index 84bfce6..986dfdd 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -123,6 +123,7 @@ mutable struct Worker <: AbstractWorker ) atexit(() -> stop(w)) + _exit_loop(w) _receive_loop(w) return w @@ -132,12 +133,12 @@ end Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port)") -function _receive_loop(worker::Worker) - io = worker.current_socket - - exit_handler_task = @async for _i in Iterators.countfrom(1) + +function _exit_loop(worker::Worker) + @async for _i in Iterators.countfrom(1) try if !isrunning(worker) + # the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply. for c in values(worker.expected_replies) isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing)) end @@ -145,16 +146,21 @@ function _receive_loop(worker::Worker) end sleep(1) catch e - @error "asdfdfs" exception=(e,catch_backtrace()) + @error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) end end +end + +function _receive_loop(worker::Worker) + io = worker.current_socket + # Here we use: # `for _i in Iterators.countfrom(1)` # instead of # `while true` # as a workaround for https://github.com/JuliaLang/julia/issues/37154 - listen_task = @async for _i in Iterators.countfrom(1) + @async for _i in Iterators.countfrom(1) try if !isopen(io) @debug("HOST: io closed.") diff --git a/test/basic.jl b/test/basic.jl index bc5a1a5..28b67fe 100644 --- a/test/basic.jl +++ b/test/basic.jl @@ -9,10 +9,28 @@ @testset "Worker management" begin w = W() @test m.isrunning(w) === true + @test m.remote_call_fetch(&, w, true, true) W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 1 - # Terminating workers takes about 0.5s - m.stop(w) + + if W === m.InProcessWorker + m.stop(w) + else + start = time() + task = m.remote_call(sleep, w, 10) + + m.stop(w) + + @test try + wait(task) + catch e + e + end isa TaskFailedException + stop = time() + @test stop - start < 8 + end + + @test m.isrunning(w) === false W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 0 end