From 0832fc3d7336e7c54ab7617621bf14286c48e34d Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Wed, 27 Sep 2023 12:47:21 +0200 Subject: [PATCH] tweakz --- src/Malt.jl | 18 ++++++++++++------ test/basic.jl | 22 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Malt.jl b/src/Malt.jl index 84bfce6..986dfdd 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -123,6 +123,7 @@ mutable struct Worker <: AbstractWorker ) atexit(() -> stop(w)) + _exit_loop(w) _receive_loop(w) return w @@ -132,12 +133,12 @@ end Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port)") -function _receive_loop(worker::Worker) - io = worker.current_socket - - exit_handler_task = @async for _i in Iterators.countfrom(1) + +function _exit_loop(worker::Worker) + @async for _i in Iterators.countfrom(1) try if !isrunning(worker) + # the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply. for c in values(worker.expected_replies) isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing)) end @@ -145,16 +146,21 @@ function _receive_loop(worker::Worker) end sleep(1) catch e - @error "asdfdfs" exception=(e,catch_backtrace()) + @error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) end end +end + +function _receive_loop(worker::Worker) + io = worker.current_socket + # Here we use: # `for _i in Iterators.countfrom(1)` # instead of # `while true` # as a workaround for https://github.com/JuliaLang/julia/issues/37154 - listen_task = @async for _i in Iterators.countfrom(1) + @async for _i in Iterators.countfrom(1) try if !isopen(io) @debug("HOST: io closed.") diff --git a/test/basic.jl b/test/basic.jl index bc5a1a5..28b67fe 100644 --- a/test/basic.jl +++ b/test/basic.jl @@ -9,10 +9,28 @@ @testset "Worker management" begin w = W() @test m.isrunning(w) === true + @test m.remote_call_fetch(&, w, true, true) W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 1 - # Terminating workers takes about 0.5s - m.stop(w) + + if W === m.InProcessWorker + m.stop(w) + else + start = time() + task = m.remote_call(sleep, w, 10) + + m.stop(w) + + @test try + wait(task) + catch e + e + end isa TaskFailedException + stop = time() + @test stop - start < 8 + end + + @test m.isrunning(w) === false W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 0 end