Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap remote exceptions in Malt.RemoteException #57

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions doc/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ The following table lists each function according to its scheduling and return v


```@docs
Malt.remotecall
Malt.remote_do
Malt.remotecall_fetch
Malt.remotecall_wait
Malt.remotecall
Malt.remote_do
```

## Evaluating expressions
Expand All @@ -65,14 +65,46 @@ For situations like this, you can evaluate code using the `remote_eval*` functio

Like the `remotecall*` functions, there's different a `remote_eval*` depending on the scheduling and return value.

| Function | Scheduling | Return value |
|:--------------------------------|:-----------|:----------------|
| [`Malt.remote_eval_fetch`](@ref) | Blocking | <value> |
| [`Malt.remote_eval_wait`](@ref) | Blocking | `nothing` |
| [`Malt.remote_eval`](@ref) | Async | `Task` that resolves to <value> |

```@docs
Malt.remote_eval
Malt.remote_eval_fetch
Malt.remote_eval_wait
Malt.remote_eval
Malt.worker_channel
```

## Exceptions

If an exception occurs on the worker while calling a function or evaluating an expression, this exception is rethrown to the host. For example:

```julia-repl
julia> Malt.remotecall_fetch(m1, :(sqrt(-1)))
ERROR: Remote exception from Malt.Worker on port 9115:

DomainError with -1.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
Stacktrace:
[1] throw_complex_domainerror(f::Symbol, x::Float64)
@ Base.Math ./math.jl:33
[2] sqrt
@ ./math.jl:591 [inlined]
...
```

The thrown exception is of the type `Malt.RemoteException`, and contains two fields: `worker` and `message::String`. The original exception object (`DomainError` in the example above) is not availabale to the host.

!!! note

When using the async scheduling functions (`remotecall`, `remote_eval`), calling `wait` or `fetch` on the returned (failed) `Task` will throw a `Base.TaskFailedException`, not a `Malt.RemoteException`.

(The `Malt.RemoteException` is available with `task_failed_exception.task.exception`.)


## Signals and Termination

Once you're done computing with a worker, or if you find yourself in an unrecoverable situation
Expand Down
24 changes: 15 additions & 9 deletions src/DistributedStdlibWorker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,28 @@ end
Base.summary(io::IO, w::DistributedStdlibWorker) = write(io, "Malt.DistributedStdlibWorker with pid $(w.pid)")


macro transform_exception(worker, ex)
:(try
$(esc(ex))
catch e
if e isa Distributed.RemoteException
throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured)))
else
rethrow(e)
end
end)
end

function remotecall(f, w::DistributedStdlibWorker, args...; kwargs...)
Distributed.remotecall(f, w.pid, args...; kwargs...)
@async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
end

function remotecall_fetch(f, w::DistributedStdlibWorker, args...; kwargs...)
Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
@transform_exception w Distributed.remotecall_fetch(f, w.pid, args...; kwargs...)
end

function remotecall_wait(f, w::DistributedStdlibWorker, args...; kwargs...)
Distributed.remotecall_wait(f, w.pid, args...; kwargs...)
@transform_exception w Distributed.remotecall_wait(f, w.pid, args...; kwargs...)
nothing
end

Expand Down Expand Up @@ -82,9 +94,3 @@ function interrupt(w::DistributedStdlibWorker)
Distributed.interrupt(w.pid)
end
end





# TODO: wrap exceptions
43 changes: 34 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,29 @@ that has already been terminated.
"""
struct TerminatedWorkerException <: Exception end

struct RemoteException <: Exception
worker::AbstractWorker
message::String
end

function Base.showerror(io::IO, e::RemoteException)
print(io, "Remote exception from $(summary(e.worker)):\n\n$(e.message)")
end

struct WorkerResult
should_throw::Bool
msg_type::UInt8
value::Any
end

unwrap_worker_result(result::WorkerResult) = result.should_throw ? throw(result.value) : result.value
function unwrap_worker_result(worker::AbstractWorker, result::WorkerResult)
if result.msg_type == MsgType.special_serialization_failure
throw(ErrorException("Error deserializing data from $(summary(worker)):\n\n$(sprint(Base.showerror, result.value))"))
elseif result.msg_type == MsgType.from_worker_call_failure
throw(RemoteException(worker, result.value))
else
result.value
end
end

include("DistributedStdlibWorker.jl")

Expand Down Expand Up @@ -160,10 +176,7 @@ function _receive_loop(worker::Worker)

c = get(worker.expected_replies, msg_id, nothing)
if c isa Channel{WorkerResult}
put!(c, WorkerResult(
msg_type == MsgType.special_serialization_failure,
msg_data
))
put!(c, WorkerResult(msg_type, msg_data))
else
@error "HOST: Received a response, but I didn't ask for anything" msg_type msg_id msg_data
end
Expand Down Expand Up @@ -267,7 +280,7 @@ function _wait_for_response(worker::Worker, msg_id::MsgID)
@debug("HOST: waiting for response of", msg_id)
response = take!(c)
delete!(worker.expected_replies, msg_id)
return unwrap_worker_result(response)
return unwrap_worker_result(worker, response)
else
error("HOST: No response expected for message id $msg_id")
end
Expand Down Expand Up @@ -320,12 +333,24 @@ function remotecall(f, w::Worker, args...; kwargs...)
)
end
function remotecall(f, w::InProcessWorker, args...; kwargs...)
w.latest_request_task = @async try
w.latest_request_task = @async remotecall_fetch(f, w, args...; kwargs...)
end
function remotecall_fetch(f, w::InProcessWorker, args...; kwargs...)
try
f(args...; kwargs...)
catch ex
ex
throw(RemoteException(
w,
sprint() do io
Base.invokelatest(showerror, io, ex, catch_backtrace())
end
))
end
end
function remotecall_wait(f, w::InProcessWorker, args...; kwargs...)
remotecall_fetch(f, w, args...; kwargs...)
nothing
end

"""
Malt.remotecall_fetch(f, w::Worker, args...; kwargs...)
Expand Down
4 changes: 3 additions & 1 deletion src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id
(true, respond_with_nothing ? nothing : result)
catch e
# @debug("WORKER: Got exception!", e)
(false, e)
(false, sprint() do io
Base.invokelatest(showerror, io, e, catch_backtrace())
end)
end

_serialize_msg(
Expand Down
22 changes: 0 additions & 22 deletions test/basic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,4 @@
m.stop(w)
@test m.isrunning(w) === false
end

(W === m.DistributedStdlibWorker) || @testset "Regular Exceptions" begin
w = W()

## Mutually Known errors are not thrown, but returned as values.

@test isa(
m.remote_eval_fetch(Main, w, quote
sqrt(-1)
end),
DomainError,
)
@test m.remotecall_fetch(&, w, true, true)

@test isa(
m.remote_eval_fetch(Main, w, quote
error("Julia stack traces are bad. GL 😉")
end),
ErrorException,
)
@test m.remotecall_fetch(&, w, true, true)
end
end
Loading