diff --git a/src/Malt.jl b/src/Malt.jl index 5eccd74..9e46fcf 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -91,8 +91,6 @@ function _recv(socket) response.result catch e rethrow(e) - finally - close(socket) end end @@ -225,7 +223,6 @@ function worker_channel(w::Worker, expr)::Channel while isopen(channel) && isopen(s) put!(channel, deserialize(s)) end - close(s) return end) end @@ -274,6 +271,13 @@ kill(w::Worker) = Base.kill(w.proc) Send an interrupt signal to the worker process. This will interrupt the latest request (`remotecall*` or `remote_eval*`) that was sent to the worker. """ -interrupt(w::Worker) = Base.kill(w.proc, Base.SIGINT) +function interrupt(w::Worker) + if Sys.iswindows() + isrunning(w) || throw(TerminatedWorkerException()) + _send_msg(w.port, (header=:interrupt,)) + else + Base.kill(w.proc, Base.SIGINT) + end +end end # module diff --git a/src/worker.jl b/src/worker.jl index e08fe14..aaa8c5b 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -34,8 +34,14 @@ function serve(server::Sockets.TCPServer) # Handle request asynchronously latest = @async begin msg = deserialize(sock) - @debug(msg) - handle(Val(msg.header), sock, msg) + if get(msg, :header, nothing) === :interrupt + if latest isa Task && !istaskdone(latest) + Base.throwto(latest, InterruptException) + end + else + @debug(msg) + handle(Val(msg.header), sock, msg) + end end catch InterruptException # Rethrow interrupt in the latest task @@ -58,16 +64,14 @@ function handle(::Val{:call}, socket, msg) catch e # @debug("Exception!", e) serialize(socket, (status=:err, result=e)) - finally - close(socket) end end function handle(::Val{:remote_do}, socket, msg) try msg.f(msg.args...; msg.kwargs...) - finally - close(socket) + catch e + nothing end end @@ -76,7 +80,7 @@ function handle(::Val{:channel}, socket, msg) while isopen(channel) && isopen(socket) serialize(socket, take!(channel)) end - isopen(socket) && close(socket) + isopen(channel) && close(channel) return end diff --git a/test/runtests.jl b/test/runtests.jl index 3b99a73..ca5ac89 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -13,7 +13,7 @@ using Test # Terminating workers takes about 0.5s m.stop(w) - sleep(1) + sleep(2) @test m.isrunning(w) === false end @@ -74,7 +74,7 @@ end @test m.isrunning(w) === true m.stop(w) - sleep(1) + sleep(2) @test m.isrunning(w) === false end