Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt on windows! 😯 #60

Merged
merged 20 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
jobs:
test:
runs-on: ${{ matrix.os }}
timeout-minutes: 20
timeout-minutes: 25

strategy:
# Without setting this, a failing test cancels all others
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ In Distributed, only "process 1 can add or remove workers". Malt.jl does not hav
### Process isolation
Malt.jl worker processes **do not inherit** `ENV` variables, command-line arguments or the Pkg environment from their host.

### Interrupt on Windows
Malt.jl supports **interrupting a worker process on Windows**, not just on UNIX.

### Heterogenous computing
Malt.jl does not have API like `@everywhere` or `Distributed.procs`: Malt is **not the right tool for homogenous computing**.

Expand Down
24 changes: 15 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ mutable struct Worker <: AbstractWorker
function Worker(; env=String[], exeflags=[])
# Spawn process
cmd = _get_worker_cmd(; env, exeflags)
proc = open(cmd, "w+")
proc = open(Cmd(
cmd;
detach=true,
windows_hide=true,
), "w+")

# Keep internal list
__iNtErNaL_get_running_procs()
Expand Down Expand Up @@ -612,18 +616,20 @@ Send an interrupt signal to the worker process. This will interrupt the
latest request (`remote_call*` or `remote_eval*`) that was sent to the worker.
"""
function interrupt(w::Worker)
if Sys.iswindows()
# TODO: not yet implemented
@warn "Malt.interrupt is not yet supported on Windows"
# _assert_is_running(w)
# _send_msg(w, MsgType.from_host_fake_interrupt, (), false)
nothing
if !isrunning(w)
@warn "Tried to interrupt a worker that has already shut down." summary(w)
else
Base.kill(w.proc, Base.SIGINT)
if Sys.iswindows()
ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc)))
fonsp marked this conversation as resolved.
Show resolved Hide resolved
else
Base.kill(w.proc, Base.SIGINT)
end
end
nothing
end
function interrupt(w::InProcessWorker)
schedule(w.latest_request_task, InterruptException(); error=true)
istaskdone(w.latest_request_task) || schedule(w.latest_request_task, InterruptException(); error=true)
nothing
end


Expand Down
146 changes: 146 additions & 0 deletions test/interrupt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
win = Sys.iswindows()

@testset "Interrupt: $W" for W in (m.DistributedStdlibWorker, m.InProcessWorker, m.Worker)
# @testset "Interrupt: $W" for W in (m.Worker,)

no_interrupt_possible = (Sys.iswindows() && W === m.DistributedStdlibWorker) || W === m.InProcessWorker


w = W()

@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)


ex1 = quote
local x = 0.0
for i = 1:4000
k = [sqrt(abs(sin(cos(tan(x))))) ^ (1 / i) for z in 1:i]
x += sum(k)
end
x
end |> Base.remove_linenums!

ex2 = quote
local x = 0.0
for i in 1:20_000_000
x += sqrt(abs(sin(cos(tan(x)))))^(1/i)
end
x
end |> Base.remove_linenums!

ex3 = :(sleep(3)) |> Base.remove_linenums!

# expressions in this list can be interrupted with a single Ctrl+C
# open a terminal and try this.
# (some expressions like `while true end` need multiple Ctrl+C in short succession to force throw SIGINT)
exs = no_interrupt_possible ? [ex1, ex3] : [
ex1,
ex3,
ex1, # second time because interrupts should be reliable
(
VERSION > v"1.10.0-0" ? [ex2, ex2] : []
)...,
]



@testset "single interrupt $ex" for ex in exs

f() = m.remote_eval(w, ex)

t1 = @elapsed wait(f())
t2 = @elapsed wait(f())

t3 = @elapsed begin
t = f()
@test !istaskdone(t)
sleep(.1)
m.interrupt(w)
r = try
fetch(t)
catch e
e
end
no_interrupt_possible || @test r isa TaskFailedException
end

t4 = @elapsed begin
t = f()
@test !istaskdone(t)
sleep(.1)
m.interrupt(w)
r = try
fetch(t)
catch e
e
end
no_interrupt_possible || @test r isa TaskFailedException
end

@info "test run" ex t1 t2 t3 t4
no_interrupt_possible || @test t4 < min(t1,t2) * 0.8

# still running and responsive
@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)

end


if !no_interrupt_possible
@testset "hard interrupt" begin

function hard_interrupt(w)
finish_task = m.remote_call(&, w, true, true)

done() = !m.isrunning(w) || istaskdone(finish_task)

while !done()
for _ in 1:5
print(" 🔥 ")
m.interrupt(w)
sleep(0.18)
if done()
break
end
end
sleep(1.5)
end
end


t = m.remote_eval(w, :(while true end))

@test !istaskdone(t)
@test m.isrunning(w)

hard_interrupt(w)


@info "xx" istaskdone(t) m.isrunning(w)

@test try
fetch(t)
catch e
e
end isa TaskFailedException

# hello
@test true

if Sys.iswindows() && VERSION < v"1.10.0-beta3"
# fixed by https://github.com/JuliaLang/julia/pull/51307 which will probably land in v1.10.0-beta3
@test_broken m.isrunning(w)
else
# still running and responsive
@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)
end
end
end


m.stop(w)
@test !m.isrunning(w)
end
2 changes: 2 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ using Test

v() = @assert isempty(m.__iNtErNaL_get_running_procs())

v()
include("interrupt.jl")
v()
include("basic.jl")
v()
Expand Down