Skip to content

Commit

Permalink
tweakz
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp committed Sep 27, 2023
1 parent def0188 commit 0832fc3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
18 changes: 12 additions & 6 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ mutable struct Worker <: AbstractWorker
)
atexit(() -> stop(w))

_exit_loop(w)
_receive_loop(w)

return w
Expand All @@ -132,29 +133,34 @@ end
Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port)")


function _receive_loop(worker::Worker)
io = worker.current_socket

exit_handler_task = @async for _i in Iterators.countfrom(1)

function _exit_loop(worker::Worker)
@async for _i in Iterators.countfrom(1)
try
if !isrunning(worker)
# the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply.
for c in values(worker.expected_replies)
isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing))
end
break
end
sleep(1)
catch e
@error "asdfdfs" exception=(e,catch_backtrace())
@error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace())
end
end
end

function _receive_loop(worker::Worker)
io = worker.current_socket


# Here we use:
# `for _i in Iterators.countfrom(1)`
# instead of
# `while true`
# as a workaround for https://github.com/JuliaLang/julia/issues/37154
listen_task = @async for _i in Iterators.countfrom(1)
@async for _i in Iterators.countfrom(1)
try
if !isopen(io)
@debug("HOST: io closed.")
Expand Down
22 changes: 20 additions & 2 deletions test/basic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,28 @@
@testset "Worker management" begin
w = W()
@test m.isrunning(w) === true
@test m.remote_call_fetch(&, w, true, true)

W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 1
# Terminating workers takes about 0.5s
m.stop(w)

if W === m.InProcessWorker
m.stop(w)
else
start = time()
task = m.remote_call(sleep, w, 10)

m.stop(w)

@test try
wait(task)
catch e
e
end isa TaskFailedException
stop = time()
@test stop - start < 8
end


@test m.isrunning(w) === false
W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 0
end
Expand Down

0 comments on commit 0832fc3

Please sign in to comment.