Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved exception context #58

Merged
merged 6 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def plugin_executor(
arg for arg in args if arg is not None
] # support plugins that have no return value
if call is None:
raise ValueError("call is a required argument")
raise ValueError(
f"call is a required argument\nnode_properties: {node_properties}"
)
if verbose:
print(f"args: {args}")
print(f"call: {call}")
Expand Down Expand Up @@ -162,7 +164,7 @@ def plugin_executor(
else:
raise ValueError(
f"expecting 1, 2 or 3 values to unpack for {callable_obj}, "
f"got {len(call)}"
f"got {len(call)}\nnode_properties: {node_properties}"
)
callable_kwargs_init = (
{} if callable_kwargs_init is None else callable_kwargs_init
Expand All @@ -175,7 +177,7 @@ def plugin_executor(
else:
raise ValueError(
f"expecting 1 or 2 values to unpack for {callable_obj}, got "
f"{len(call)}"
f"{len(call)}\nnode_properties: {node_properties}"
)
callable_kwargs = {} if callable_kwargs is None else callable_kwargs

Expand All @@ -188,7 +190,14 @@ def plugin_executor(
callable_kwargs_init
| _get_common_args_matching_signature(callable_obj, common_kwargs)
)
callable_obj = callable_obj(**callable_kwargs_init)
try:
callable_obj = callable_obj(**callable_kwargs_init)
except Exception as err:
msg = (
f"Failed to initialise {obj_name} with {callable_kwargs_init}"
f"\nnode_properties: {node_properties}"
)
raise RuntimeError(msg) from err
call_msg = f"(**{callable_kwargs_init})"

callable_kwargs = callable_kwargs | _get_common_args_matching_signature(
Expand All @@ -203,7 +212,14 @@ def plugin_executor(
with TimeIt() as timer, dask.config.set(
scheduler="single-threaded"
), CaptureProcMemory() as mem:
res = callable_obj(*args, **callable_kwargs)
try:
res = callable_obj(*args, **callable_kwargs)
except Exception as err:
msg = (
f"Failed to execute {obj_name} with {args}, {callable_kwargs}"
f"\nnode_properties: {node_properties}"
)
raise RuntimeError(msg) from err
msg = f"{str(timer)}; {msg}; {mem.max()}"
logging.info(msg)

Expand Down Expand Up @@ -292,8 +308,9 @@ def __init__(
function. Optional.
- `scheduler` (str):
Accepted values include "ray", "multiprocessing" and those recognised
by dask: "threads", "processes" and "single-threaded" (useful for debugging).
See https://docs.dask.org/en/latest/scheduling.html. Optional.
by dask: "threads", "processes" and "single-threaded" (useful for debugging)
and "distributed". See https://docs.dask.org/en/latest/scheduling.html.
Optional.
- `num_workers` (int):
Number of processes or threads to use. Optional.
- `dry_run` (bool):
Expand Down
15 changes: 13 additions & 2 deletions dagrunner/runner/schedulers/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,23 @@ def __init__(self, num_workers, profiler_filepath=None, **kwargs):
self._profiler_output = profiler_filepath
self._kwargs = kwargs
self._cluster = None
self._client = None
# bug: dashboard cannot be disabled
# see https://github.com/dask/distributed/issues/8136
self._dashboard_address = None

def __enter__(self):
"""Create a local cluster and connect a client to it."""
self._cluster = LocalCluster(
n_workers=self._num_workers,
processes=True,
threads_per_worker=1,
dashboard_address=self._dashboard_address,
**self._kwargs,
)
Client(self._cluster)
self._client = Client(self._cluster)
if self._dashboard_address:
print(f"dashboard link: {self._client.dashboard_link}")
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
Expand Down Expand Up @@ -182,6 +189,7 @@ def run(self, dask_graph, verbose=False):
scheduler=self._scheduler,
num_workers=self._num_workers,
chunksize=1,
**self._kwargs,
)
visualize(
[prof, rprof, cprof],
Expand All @@ -193,7 +201,10 @@ def run(self, dask_graph, verbose=False):
print(f"{max([res.mem for res in rprof.results])}MB total memory used")
else:
res = self._dask_container.compute(
scheduler=self._scheduler, num_workers=self._num_workers, chunksize=1
scheduler=self._scheduler,
num_workers=self._num_workers,
chunksize=1,
**self._kwargs,
)
return res

Expand Down
44 changes: 44 additions & 0 deletions dagrunner/tests/execute_graph/test_plugin_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,47 @@ def test_pass_common_args_override():
)
res = plugin_executor(*args, call=call, common_kwargs=common_kwargs)
assert res == target


def test_missing_call_args():
"""Raise an error if 'call' arg isn't provided."""
kwargs = {"key1": mock.sentinel.value1, "key2": mock.sentinel.value2}
msg = f"call is a required argument\nnode_properties: {kwargs}"
with pytest.raises(ValueError, match=msg):
plugin_executor(mock.sentinel.arg1, **kwargs)


def test_class_plugin_unexpected_tuple_unpack():
"""Expecting inits kwargs and call kwargs but no more."""
msg = "expecting 1, 2 or 3 values to unpack.*got 4"
with pytest.raises(ValueError, match=msg):
plugin_executor(mock.sentinel.arg1, call=(DummyPlugin, {}, {}, {}))


def test_callable_plugin_unexpected_tuple_unpack():
"""Expecting call kwargs but no more."""
msg = "expecting 1 or 2 values to unpack.*got 3"
with pytest.raises(ValueError, match=msg):
plugin_executor(mock.sentinel.arg1, call=(DummyPluginNoNamedParam(), {}, {}))


class BadDummyInitPlugin:
def __init__(self, **kwargs):
raise ValueError("some error")

def __call__(self, *args, **kwargs):
pass


def test_extended_init_failure_context():
with pytest.raises(RuntimeError, match="Failed to initialise"):
plugin_executor(mock.sentinel.arg1, call=(BadDummyInitPlugin,))


def bad_call_plugin(*args):
raise ValueError("some error")


def test_extended_call_plugin_failure_context():
with pytest.raises(RuntimeError, match="Failed to execute"):
plugin_executor(mock.sentinel.arg1, call=(bad_call_plugin,))
15 changes: 8 additions & 7 deletions docs/dagrunner.execute_graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ see [function: dagrunner.utils.visualisation.visualise_graph](dagrunner.utils.vi

## class: `ExecuteGraph`

[Source](../dagrunner/execute_graph.py#L266)
[Source](../dagrunner/execute_graph.py#L282)

### Call Signature:

Expand All @@ -26,7 +26,7 @@ ExecuteGraph(networkx_graph: str, networkx_graph_kwargs: dict = None, <function

### function: `__call__`

[Source](../dagrunner/execute_graph.py#L363)
[Source](../dagrunner/execute_graph.py#L380)

#### Call Signature:

Expand All @@ -38,7 +38,7 @@ Call self as a function.

### function: `__init__`

[Source](../dagrunner/execute_graph.py#L267)
[Source](../dagrunner/execute_graph.py#L283)

#### Call Signature:

Expand All @@ -61,8 +61,9 @@ Args:
function. Optional.
- `scheduler` (str):
Accepted values include "ray", "multiprocessing" and those recognised
by dask: "threads", "processes" and "single-threaded" (useful for debugging).
See https://docs.dask.org/en/latest/scheduling.html. Optional.
by dask: "threads", "processes" and "single-threaded" (useful for debugging)
and "distributed". See https://docs.dask.org/en/latest/scheduling.html.
Optional.
- `num_workers` (int):
Number of processes or threads to use. Optional.
- `dry_run` (bool):
Expand All @@ -78,7 +79,7 @@ Args:

### function: `visualise`

[Source](../dagrunner/execute_graph.py#L360)
[Source](../dagrunner/execute_graph.py#L377)

#### Call Signature:

Expand Down Expand Up @@ -107,7 +108,7 @@ Status: experimental.

## function: `main`

[Source](../dagrunner/execute_graph.py#L374)
[Source](../dagrunner/execute_graph.py#L391)

### Call Signature:

Expand Down
26 changes: 13 additions & 13 deletions docs/dagrunner.runner.schedulers.dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ See the following useful background reading:

## class: `DaskOnRay`

[Source](../dagrunner/runner/schedulers/dask.py#L204)
[Source](../dagrunner/runner/schedulers/dask.py#L215)

### Call Signature:

Expand All @@ -27,7 +27,7 @@ A class to run dask graphs using the 'dak-on-ray' scheduler.

### function: `__enter__`

[Source](../dagrunner/runner/schedulers/dask.py#L212)
[Source](../dagrunner/runner/schedulers/dask.py#L223)

#### Call Signature:

Expand All @@ -37,7 +37,7 @@ __enter__(self)

### function: `__exit__`

[Source](../dagrunner/runner/schedulers/dask.py#L215)
[Source](../dagrunner/runner/schedulers/dask.py#L226)

#### Call Signature:

Expand All @@ -47,7 +47,7 @@ __exit__(self, exc_type, exc_value, exc_traceback)

### function: `__init__`

[Source](../dagrunner/runner/schedulers/dask.py#L207)
[Source](../dagrunner/runner/schedulers/dask.py#L218)

#### Call Signature:

Expand All @@ -59,7 +59,7 @@ Initialize self. See help(type(self)) for accurate signature.

### function: `run`

[Source](../dagrunner/runner/schedulers/dask.py#L220)
[Source](../dagrunner/runner/schedulers/dask.py#L231)

#### Call Signature:

Expand Down Expand Up @@ -92,7 +92,7 @@ A class to run dask graphs on a distributed cluster.

### function: `__enter__`

[Source](../dagrunner/runner/schedulers/dask.py#L93)
[Source](../dagrunner/runner/schedulers/dask.py#L97)

#### Call Signature:

Expand All @@ -104,7 +104,7 @@ Create a local cluster and connect a client to it.

### function: `__exit__`

[Source](../dagrunner/runner/schedulers/dask.py#L104)
[Source](../dagrunner/runner/schedulers/dask.py#L111)

#### Call Signature:

Expand All @@ -126,7 +126,7 @@ Initialize self. See help(type(self)) for accurate signature.

### function: `run`

[Source](../dagrunner/runner/schedulers/dask.py#L108)
[Source](../dagrunner/runner/schedulers/dask.py#L115)

#### Call Signature:

Expand All @@ -147,7 +147,7 @@ Returns:

## class: `SingleMachine`

[Source](../dagrunner/runner/schedulers/dask.py#L133)
[Source](../dagrunner/runner/schedulers/dask.py#L140)

### Call Signature:

Expand All @@ -159,7 +159,7 @@ A class to run dask graphs on a single machine.

### function: `__enter__`

[Source](../dagrunner/runner/schedulers/dask.py#L145)
[Source](../dagrunner/runner/schedulers/dask.py#L152)

#### Call Signature:

Expand All @@ -169,7 +169,7 @@ __enter__(self)

### function: `__exit__`

[Source](../dagrunner/runner/schedulers/dask.py#L200)
[Source](../dagrunner/runner/schedulers/dask.py#L211)

#### Call Signature:

Expand All @@ -179,7 +179,7 @@ __exit__(self, exc_type, exc_value, exc_traceback)

### function: `__init__`

[Source](../dagrunner/runner/schedulers/dask.py#L136)
[Source](../dagrunner/runner/schedulers/dask.py#L143)

#### Call Signature:

Expand All @@ -191,7 +191,7 @@ Initialize self. See help(type(self)) for accurate signature.

### function: `run`

[Source](../dagrunner/runner/schedulers/dask.py#L148)
[Source](../dagrunner/runner/schedulers/dask.py#L155)

#### Call Signature:

Expand Down