From c5df2d90f9b97ba5d88ceabe281885bdf63aded0 Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Tue, 12 Sep 2023 16:31:27 +0200 Subject: [PATCH 1/4] Wrap remote exceptions in Malt.RemoteException --- src/DistributedStdlibWorker.jl | 18 +++- src/Malt.jl | 43 +++++++-- src/worker.jl | 4 +- test/exceptions.jl | 165 ++++++++++++++++++++++++--------- 4 files changed, 172 insertions(+), 58 deletions(-) diff --git a/src/DistributedStdlibWorker.jl b/src/DistributedStdlibWorker.jl index 163632e..73d3102 100644 --- a/src/DistributedStdlibWorker.jl +++ b/src/DistributedStdlibWorker.jl @@ -39,16 +39,28 @@ end Base.summary(io::IO, w::DistributedStdlibWorker) = write(io, "Malt.DistributedStdlibWorker with pid $(w.pid)") +macro transform_exception(worker, ex) + :(try + $(esc(ex)) + catch e + if e isa Distributed.RemoteException + throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured))) + else + rethrow(e) + end + end) +end + function remotecall(f, w::DistributedStdlibWorker, args...; kwargs...) - Distributed.remotecall(f, w.pid, args...; kwargs...) + @async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) end function remotecall_fetch(f, w::DistributedStdlibWorker, args...; kwargs...) - Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) + @transform_exception w Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) end function remotecall_wait(f, w::DistributedStdlibWorker, args...; kwargs...) - Distributed.remotecall_wait(f, w.pid, args...; kwargs...) + @transform_exception w Distributed.remotecall_wait(f, w.pid, args...; kwargs...) nothing end diff --git a/src/Malt.jl b/src/Malt.jl index 3c0e990..169a4d7 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -27,13 +27,29 @@ that has already been terminated. """ struct TerminatedWorkerException <: Exception end +struct RemoteException <: Exception + worker::AbstractWorker + message::String +end + +function Base.showerror(io::IO, e::RemoteException) + print(io, "Remote exception from $(summary(e.worker)):\n\n$(e.message)") +end struct WorkerResult - should_throw::Bool + msg_type::UInt8 value::Any end -unwrap_worker_result(result::WorkerResult) = result.should_throw ? throw(result.value) : result.value +function unwrap_worker_result(worker::AbstractWorker, result::WorkerResult) + if result.msg_type == MsgType.special_serialization_failure + throw(ErrorException("Error deserializing data from $(summary(worker)):\n\n$(sprint(Base.showerror, result.value))")) + elseif result.msg_type == MsgType.from_worker_call_failure + throw(RemoteException(worker, result.value)) + else + result.value + end +end include("DistributedStdlibWorker.jl") @@ -160,10 +176,7 @@ function _receive_loop(worker::Worker) c = get(worker.expected_replies, msg_id, nothing) if c isa Channel{WorkerResult} - put!(c, WorkerResult( - msg_type == MsgType.special_serialization_failure, - msg_data - )) + put!(c, WorkerResult(msg_type, msg_data)) else @error "HOST: Received a response, but I didn't ask for anything" msg_type msg_id msg_data end @@ -267,7 +280,7 @@ function _wait_for_response(worker::Worker, msg_id::MsgID) @debug("HOST: waiting for response of", msg_id) response = take!(c) delete!(worker.expected_replies, msg_id) - return unwrap_worker_result(response) + return unwrap_worker_result(worker, response) else error("HOST: No response expected for message id $msg_id") end @@ -320,12 +333,24 @@ function remotecall(f, w::Worker, args...; kwargs...) ) end function remotecall(f, w::InProcessWorker, args...; kwargs...) - w.latest_request_task = @async try + w.latest_request_task = @async remotecall_fetch(f, w, args...; kwargs...) +end +function remotecall_fetch(f, w::InProcessWorker, args...; kwargs...) + try f(args...; kwargs...) catch ex - ex + throw(RemoteException( + w, + sprint() do io + Base.invokelatest(showerror, io, ex, catch_backtrace()) + end + )) end end +function remotecall_wait(f, w::InProcessWorker, args...; kwargs...) + remotecall_fetch(f, w, args...; kwargs...) + nothing +end """ Malt.remotecall_fetch(f, w::Worker, args...; kwargs...) diff --git a/src/worker.jl b/src/worker.jl index 35273c9..3619096 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -121,7 +121,9 @@ function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id (true, respond_with_nothing ? nothing : result) catch e # @debug("WORKER: Got exception!", e) - (false, e) + (false, sprint() do io + Base.invokelatest(showerror, io, e, catch_backtrace()) + end) end _serialize_msg( diff --git a/test/exceptions.jl b/test/exceptions.jl index b489afe..51909c9 100644 --- a/test/exceptions.jl +++ b/test/exceptions.jl @@ -1,64 +1,139 @@ +macro catcherror(ex) + return quote + local success + e = try + $(esc(ex)) + success = true + catch e + success = false + e + end + @assert !success "Expression did not throw :(" + e + end +end -@testset "Serialization Exceptions" begin - ## Serializing values of unknown types will cause an exception. - w = m.Worker() # does not apply to Malt.InProcessWorker - - stub_type_name = gensym(:NonLocalType) - - m.remote_eval_wait(w, quote - struct $(stub_type_name) end - end) - - @test_throws( - Exception, - m.remote_eval_fetch(w, quote - $stub_type_name() - end), +# @testset "Exceptions" begin +@testset "Exceptions: $W" for W in ( + m.DistributedStdlibWorker, + m.Worker, + m.InProcessWorker, ) - @test m.remotecall_fetch(&, w, true, true) - + + CallFailedException = m.RemoteException + CallFailedAndDeserializationOfExceptionFailedException = m.RemoteException + # Distributed cannot easily distinguish between a call that failed and a call that returned something that could not be deserialized by the host. + DeserializationFailedException = W === m.DistributedStdlibWorker ? Exception : ErrorException + + + + w = W() # does not apply to Malt.InProcessWorker + + + @testset "Remote failure" begin + # m.remote_eval_wait(w, :(sqrt(-1))) + + @test_throws( + CallFailedException, + m.remote_eval_wait(w, :(sqrt(-1))), + ) + @test_throws( + ["Remote exception", "DomainError", "math.jl"], + m.remote_eval_wait(w, :(sqrt(-1))), + ) + # TODO + # @test_throws( + # m.RemoteException, + # wait(m.remote_eval(w, :(sqrt(-1)))), + # ) + # @test_throws( + # TaskFailedException, + # wait(m.remote_eval(w, :(sqrt(-1)))), + # ) + + @test_nowarn m.remote_do(sqrt, w, -1) + + @test m.remotecall_fetch(&, w, true, true) + end + + W === m.InProcessWorker || @testset "Deserializing values of unknown types" begin + stub_type_name = gensym(:NonLocalType) - ## Throwing unknown exceptions will definitely cause an exception. + m.remote_eval_wait(w, quote + struct $(stub_type_name) end + end) + # TODO + m.remote_eval_wait(w, :($stub_type_name())) + @test_throws( + DeserializationFailedException, + m.remote_eval_fetch(w, :($(stub_type_name)())), + ) + @test_throws( + TaskFailedException, + fetch(m.remote_eval(w, :($(stub_type_name)()))), + ) + @test m.remotecall_fetch(&, w, true, true) + end stub_type_name2 = gensym(:NonLocalException) m.remote_eval_wait(w, quote struct $stub_type_name2 <: Exception end + Base.showerror(io::IO, e::$stub_type_name2) = print(io, "secretttzz") end) - @test_throws( - Exception, - m.remote_eval_fetch(w, quote - throw($stub_type_name2()) - end), - ) - @test m.remotecall_fetch(&, w, true, true) + @testset "Throwing unknown exception" begin + + @test_throws( + CallFailedAndDeserializationOfExceptionFailedException, + m.remote_eval_fetch(w, :(throw($stub_type_name2()))), + ) + @test_throws( + ["Remote exception", W !== m.DistributedStdlibWorker ? "secretttzz" : "deseriali"], + m.remote_eval_fetch(w, :(throw($stub_type_name2()))), + ) + @test_throws( + TaskFailedException, + fetch(m.remote_eval(w, :(throw($stub_type_name2())))), + ) + @test m.remotecall_fetch(&, w, true, true) + end - ## Catching unknown exceptions and returning them as values also causes an exception. - - @test_throws( - Exception, - m.remote_eval_fetch(w, quote + @testset "Returning an exception" begin + + @test_nowarn m.remote_eval_fetch(w, quote try - throw($stub_type_name2()) + sqrt(-1) catch e e end - end), - ) - @test m.remotecall_fetch(&, w, true, true) - - - # TODO - # @test_throws( - # Exception, - # m.worker_channel(w, :(123)) - # ) - # @test_throws( - # Exception, - # m.worker_channel(w, :(sqrt(-1))) - # ) + end) + + ## Catching unknown exceptions and returning them as values also causes an exception. + W === m.InProcessWorker || @test_throws( + DeserializationFailedException, + m.remote_eval_fetch(w, quote + try + throw($stub_type_name2()) + catch e + e + end + end), + ) + + + # TODO + # @test_throws( + # Exception, + # m.worker_channel(w, :(123)) + # ) + # @test_throws( + # Exception, + # m.worker_channel(w, :(sqrt(-1))) + # ) + @test m.remotecall_fetch(&, w, true, true) + end # The worker should be able to handle all that throwing @test m.isrunning(w) From 5bce4ee082d03ff7a09d60cfc79d202244e145ff Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Tue, 12 Sep 2023 16:37:16 +0200 Subject: [PATCH 2/4] fix old tests (and then removed them hehe) --- test/basic.jl | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/test/basic.jl b/test/basic.jl index dd72b4c..1a04708 100644 --- a/test/basic.jl +++ b/test/basic.jl @@ -139,26 +139,4 @@ m.stop(w) @test m.isrunning(w) === false end - - (W === m.DistributedStdlibWorker) || @testset "Regular Exceptions" begin - w = W() - - ## Mutually Known errors are not thrown, but returned as values. - - @test isa( - m.remote_eval_fetch(Main, w, quote - sqrt(-1) - end), - DomainError, - ) - @test m.remotecall_fetch(&, w, true, true) - - @test isa( - m.remote_eval_fetch(Main, w, quote - error("Julia stack traces are bad. GL 😉") - end), - ErrorException, - ) - @test m.remotecall_fetch(&, w, true, true) - end end \ No newline at end of file From 32495e235a125ea3b2cc58eafa4dadbce6aedcb3 Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Tue, 12 Sep 2023 16:43:24 +0200 Subject: [PATCH 3/4] fix old julia tests --- test/exceptions.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/exceptions.jl b/test/exceptions.jl index 51909c9..66b9145 100644 --- a/test/exceptions.jl +++ b/test/exceptions.jl @@ -37,7 +37,8 @@ end CallFailedException, m.remote_eval_wait(w, :(sqrt(-1))), ) - @test_throws( + # searching for strings requires Julia 1.8 + VERSION >= v"1.8.0" && @test_throws( ["Remote exception", "DomainError", "math.jl"], m.remote_eval_wait(w, :(sqrt(-1))), ) @@ -88,7 +89,8 @@ end CallFailedAndDeserializationOfExceptionFailedException, m.remote_eval_fetch(w, :(throw($stub_type_name2()))), ) - @test_throws( + # searching for strings requires Julia 1.8 + VERSION >= v"1.8.0" && @test_throws( ["Remote exception", W !== m.DistributedStdlibWorker ? "secretttzz" : "deseriali"], m.remote_eval_fetch(w, :(throw($stub_type_name2()))), ) From 44d69cf956958b695209640c3a0e16d5d20d54fb Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Tue, 12 Sep 2023 17:11:00 +0200 Subject: [PATCH 4/4] docs --- doc/src/index.md | 38 +++++++++++++++++++++++++++++++--- src/DistributedStdlibWorker.jl | 6 ------ test/exceptions.jl | 13 ++++-------- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/doc/src/index.md b/doc/src/index.md index 3b0b0a2..d7f488b 100644 --- a/doc/src/index.md +++ b/doc/src/index.md @@ -51,10 +51,10 @@ The following table lists each function according to its scheduling and return v ```@docs -Malt.remotecall -Malt.remote_do Malt.remotecall_fetch Malt.remotecall_wait +Malt.remotecall +Malt.remote_do ``` ## Evaluating expressions @@ -65,14 +65,46 @@ For situations like this, you can evaluate code using the `remote_eval*` functio Like the `remotecall*` functions, there's different a `remote_eval*` depending on the scheduling and return value. +| Function | Scheduling | Return value | +|:--------------------------------|:-----------|:----------------| +| [`Malt.remote_eval_fetch`](@ref) | Blocking | | +| [`Malt.remote_eval_wait`](@ref) | Blocking | `nothing` | +| [`Malt.remote_eval`](@ref) | Async | `Task` that resolves to | ```@docs -Malt.remote_eval Malt.remote_eval_fetch Malt.remote_eval_wait +Malt.remote_eval Malt.worker_channel ``` +## Exceptions + +If an exception occurs on the worker while calling a function or evaluating an expression, this exception is rethrown to the host. For example: + +```julia-repl +julia> Malt.remotecall_fetch(m1, :(sqrt(-1))) +ERROR: Remote exception from Malt.Worker on port 9115: + +DomainError with -1.0: +sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)). +Stacktrace: + [1] throw_complex_domainerror(f::Symbol, x::Float64) + @ Base.Math ./math.jl:33 + [2] sqrt + @ ./math.jl:591 [inlined] + ... +``` + +The thrown exception is of the type `Malt.RemoteException`, and contains two fields: `worker` and `message::String`. The original exception object (`DomainError` in the example above) is not availabale to the host. + +!!! note + + When using the async scheduling functions (`remotecall`, `remote_eval`), calling `wait` or `fetch` on the returned (failed) `Task` will throw a `Base.TaskFailedException`, not a `Malt.RemoteException`. + + (The `Malt.RemoteException` is available with `task_failed_exception.task.exception`.) + + ## Signals and Termination Once you're done computing with a worker, or if you find yourself in an unrecoverable situation diff --git a/src/DistributedStdlibWorker.jl b/src/DistributedStdlibWorker.jl index 73d3102..37ec869 100644 --- a/src/DistributedStdlibWorker.jl +++ b/src/DistributedStdlibWorker.jl @@ -94,9 +94,3 @@ function interrupt(w::DistributedStdlibWorker) Distributed.interrupt(w.pid) end end - - - - - -# TODO: wrap exceptions \ No newline at end of file diff --git a/test/exceptions.jl b/test/exceptions.jl index 66b9145..2e9f9a0 100644 --- a/test/exceptions.jl +++ b/test/exceptions.jl @@ -42,15 +42,10 @@ end ["Remote exception", "DomainError", "math.jl"], m.remote_eval_wait(w, :(sqrt(-1))), ) - # TODO - # @test_throws( - # m.RemoteException, - # wait(m.remote_eval(w, :(sqrt(-1)))), - # ) - # @test_throws( - # TaskFailedException, - # wait(m.remote_eval(w, :(sqrt(-1)))), - # ) + @test_throws( + TaskFailedException, + wait(m.remote_eval(w, :(sqrt(-1)))), + ) @test_nowarn m.remote_do(sqrt, w, -1)