Skip to content

Commit

Permalink
BUG: fix common arg handling (#45)
Browse files Browse the repository at this point in the history
* BUG: fix common arg passing
* exclude logging from CI testing
* Fixed test_function_to_parse.py working with various versions of argparse
  • Loading branch information
cpelley authored Jul 11, 2024
1 parent 0571bea commit d795acc
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 53 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ jobs:
pip uninstall dagrunner -y
# TESTS (inc. test coverage)
# excluded logging as per https://github.com/MetOffice/dagrunner/issues/39
- name: Run pytest + coverage report gen
run: pytest --cov=dagrunner --cov-report=term --cov-report=html | tee coverage_output.txt
run: pytest --cov=dagrunner --cov-report=term --cov-report=html --ignore=dagrunner/tests/utils/logging/test_integration.py | tee coverage_output.txt; test ${PIPESTATUS[0]} -eq 0


# TESTS (main branch)
- name: Cache ref branch coverage report
Expand Down
65 changes: 38 additions & 27 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
#
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
import importlib
import inspect
import logging
import warnings
from functools import partial

import importlib
import networkx as nx

import dask
import networkx as nx
from dask.base import tokenize
from dask.utils import apply

from dagrunner.plugin_framework import NodeAwarePlugin
from dagrunner.runner.schedulers import SCHEDULERS
from dagrunner.utils import (
TimeIt,
function_to_argparse,
logger,
)
from dagrunner.plugin_framework import NodeAwarePlugin
from dagrunner.runner.schedulers import SCHEDULERS
from dagrunner.utils.visualisation import visualise_graph
from dagrunner.utils import logger


class SkipBranch(Exception):
Expand All @@ -38,12 +38,17 @@ class SkipBranch(Exception):
pass


def _get_common_args_matching_signature(callable_obj, common_kwargs):
"""Get subset of arguments which match the callable signature."""
def _get_common_args_matching_signature(callable_obj, common_kwargs, keys=None):
"""
Get subset of arguments which match the callable signature.
Also additionally include those 'keys' provided
"""
keys = [] if keys is None else keys
return {
key: value
for key, value in common_kwargs.items()
if key in inspect.signature(callable_obj).parameters
if key in inspect.signature(callable_obj).parameters or key in keys
}


Expand All @@ -56,19 +61,22 @@ def plugin_executor(
**node_properties,
):
"""
Executes a plugin function or method with the provided arguments and keyword arguments.
Executes a plugin callable with the provided arguments and keyword arguments.
Args:
- `*args`: Positional arguments to be passed to the plugin function or method.
- `call`: A tuple containing the callable object or python dot path to one, keyword arguments
to instantiate this class (optional and where this callable is a class) and finally the keyword
arguments to be passed to this callable.
- `*args`: Positional arguments to be passed to the plugin callable.
- `call`: A tuple containing the callable object or python dot path to one, keyword
arguments to instantiate this class (optional and where this callable is a class)
and finally the keyword arguments to be passed to this callable.
- `verbose`: A boolean indicating whether to print verbose output.
- `dry_run`: A boolean indicating whether to perform a dry run without executing the plugin.
- `common_kwargs`: A dictionary of optional keyword arguments to apply to all applicable plugins.
That is, being passed to the plugin initialisation and or call if such keywords are expected
from the plugin. This is a useful alternative to global or environment variable usage.
- `**node_properties`: Node properties. These will be passed to 'node-aware' plugins.
- `dry_run`: A boolean indicating whether to perform a dry run without executing
the plugin.
- `common_kwargs`: A dictionary of optional keyword arguments to apply to all
applicable plugins. That is, being passed to the plugin initialisation and or
call if such keywords are expected from the plugin. This is a useful alternative
to global or environment variable usage.
- `**node_properties`: Node properties. These will be passed to 'node-aware'
plugins.
Returns:
- The result of executing the plugin function or method.
Expand Down Expand Up @@ -124,9 +132,9 @@ def plugin_executor(
callable_obj = callable_obj(**callable_kwargs_init)
call_msg = f"(**{callable_kwargs_init})"

callable_kwargs = callable_kwargs | {
key: value for key, value in common_kwargs.items() if key in callable_kwargs
} # based on overriding arguments
callable_kwargs = callable_kwargs | _get_common_args_matching_signature(
callable_obj, common_kwargs, callable_kwargs.keys()
) # based on overriding arguments

msg = f"{obj_name}{call_msg}(*{args}, **{callable_kwargs})"
if verbose:
Expand Down Expand Up @@ -223,8 +231,8 @@ def __init__(
Keyword arguments to pass to the networkx graph callable. Optional.
- `plugin_executor` (callable):
A callable object that executes a plugin function or method with the provided
arguments and keyword arguments. By default, uses the `plugin_executor` function.
Optional.
arguments and keyword arguments. By default, uses the `plugin_executor`
function. Optional.
- `scheduler` (str):
Accepted values include "ray", "multiprocessing" and those recognised
by dask: "threads", "processes" and "single-threaded" (useful for debugging).
Expand All @@ -249,7 +257,8 @@ def __init__(
self._plugin_executor = plugin_executor
if scheduler not in SCHEDULERS:
raise ValueError(
f"scheduler '{scheduler}' not recognised, please choose from {list(SCHEDULERS.keys())}"
f"scheduler '{scheduler}' not recognised, please choose from "
f"{list(SCHEDULERS.keys())}"
)
self._scheduler = SCHEDULERS[scheduler]
self._num_workers = num_workers
Expand All @@ -264,11 +273,13 @@ def nxgraph(self):

def _process_graph(self):
"""
Create flattened dictionary describing the relationship between each of our nodes.
Create flattened dictionary describing the relationship between nodes.
Here we wrap our nodes to ensure common parameters are share across all
executed nodes (e.g. dry-run, verbose).
TODO: Potentially support 'clobber' i.e. partial graph execution from a graph failure recovery.
TODO: Potentially support 'clobber' i.e. partial graph execution from a graph
failure recovery.
"""
executor = partial(
self._plugin_executor,
Expand Down
30 changes: 23 additions & 7 deletions dagrunner/tests/utils/test_function_to_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@
CALLING_MOD = os.path.basename(sys.argv[0])


def assert_help_str(help_str, tar):
# Remove line wraps for easier comparison
formatted_help_str = []
help_strs = help_str.split("\n")
for line in help_strs:
if line.startswith(" "):
# more than 2 space indent
formatted_help_str[-1] += f" {line.lstrip()}"
else:
formatted_help_str.append(line)
help_str = "\n".join(formatted_help_str)

if "optional arguments:" in help_str:
# older versions of argparse use "optional arguments: instead of options:"
tar = tar.replace("options:", "optional arguments:")
assert help_str == tar


def get_parser_help_string(parser):
buffer = StringIO()
parser.print_help(file=buffer)
Expand Down Expand Up @@ -56,7 +74,7 @@ def test_basic_args_kwargs_optional():
--arg2 ARG2 Description of arg2.
--arg3 Description of arg3. Optional.
"""
assert help_str == tar
assert_help_str(help_str, tar)

args = parser.parse_args(["3", "--arg2", "arg2", "--arg3"])
assert args.arg1 == 3
Expand Down Expand Up @@ -93,10 +111,9 @@ def test_kwargs_param_expand():
options:
-h, --help show this help message and exit
--kwargs key value Optional global keyword arguments to apply to all
applicable plugins. Key-value pair argument.
--kwargs key value Optional global keyword arguments to apply to all applicable plugins. Key-value pair argument.
"""
assert help_str == tar
assert_help_str(help_str, tar)

args = parser.parse_args(["--kwargs", "key1", "val1", "--kwargs", "key2", "val2"])
assert "kwargs" in args
Expand Down Expand Up @@ -133,11 +150,10 @@ def test_dict_param():
options:
-h, --help show this help message and exit
--dkwarg1 key value Description of kwarg1. Optional. Key-value pair
argument.
--dkwarg1 key value Description of kwarg1. Optional. Key-value pair argument.
--dkwarg2 key value Description of kwarg2. Key-value pair argument.
"""
assert help_str == tar
assert_help_str(help_str, tar)
args = parser.parse_args(
[
"--dkwarg1",
Expand Down
39 changes: 21 additions & 18 deletions docs/dagrunner.execute_graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ see [function: dagrunner.utils.visualisation.visualise_graph](dagrunner.utils.vi

## class: `ExecuteGraph`

[Source](../dagrunner/execute_graph.py#L200)
[Source](../dagrunner/execute_graph.py#L208)

### Call Signature:

Expand All @@ -24,7 +24,7 @@ ExecuteGraph(networkx_graph: str, networkx_graph_kwargs: dict = None, <function

### function: `__call__`

[Source](../dagrunner/execute_graph.py#L298)
[Source](../dagrunner/execute_graph.py#L309)

#### Call Signature:

Expand All @@ -36,7 +36,7 @@ Call self as a function.

### function: `__init__`

[Source](../dagrunner/execute_graph.py#L201)
[Source](../dagrunner/execute_graph.py#L209)

#### Call Signature:

Expand All @@ -55,8 +55,8 @@ Args:
Keyword arguments to pass to the networkx graph callable. Optional.
- `plugin_executor` (callable):
A callable object that executes a plugin function or method with the provided
arguments and keyword arguments. By default, uses the `plugin_executor` function.
Optional.
arguments and keyword arguments. By default, uses the `plugin_executor`
function. Optional.
- `scheduler` (str):
Accepted values include "ray", "multiprocessing" and those recognised
by dask: "threads", "processes" and "single-threaded" (useful for debugging).
Expand All @@ -78,7 +78,7 @@ Args:

### function: `visualise`

[Source](../dagrunner/execute_graph.py#L295)
[Source](../dagrunner/execute_graph.py#L306)

#### Call Signature:

Expand All @@ -101,7 +101,7 @@ subsequent tasks.

## function: `main`

[Source](../dagrunner/execute_graph.py#L311)
[Source](../dagrunner/execute_graph.py#L322)

### Call Signature:

Expand All @@ -114,27 +114,30 @@ Parses command line arguments and executes the graph using the ExecuteGraph clas

## function: `plugin_executor`

[Source](../dagrunner/execute_graph.py#L50)
[Source](../dagrunner/execute_graph.py#L55)

### Call Signature:

```python
plugin_executor(*args, call=None, verbose=False, dry_run=False, common_kwargs=None, **node_properties)
```

Executes a plugin function or method with the provided arguments and keyword arguments.
Executes a plugin callable with the provided arguments and keyword arguments.

Args:
- `*args`: Positional arguments to be passed to the plugin function or method.
- `call`: A tuple containing the callable object or python dot path to one, keyword arguments
to instantiate this class (optional and where this callable is a class) and finally the keyword
arguments to be passed to this callable.
- `*args`: Positional arguments to be passed to the plugin callable.
- `call`: A tuple containing the callable object or python dot path to one, keyword
arguments to instantiate this class (optional and where this callable is a class)
and finally the keyword arguments to be passed to this callable.
- `verbose`: A boolean indicating whether to print verbose output.
- `dry_run`: A boolean indicating whether to perform a dry run without executing the plugin.
- `common_kwargs`: A dictionary of optional keyword arguments to apply to all applicable plugins.
That is, being passed to the plugin initialisation and or call if such keywords are expected
from the plugin. This is a useful alternative to global or environment variable usage.
- `**node_properties`: Node properties. These will be passed to 'node-aware' plugins.
- `dry_run`: A boolean indicating whether to perform a dry run without executing
the plugin.
- `common_kwargs`: A dictionary of optional keyword arguments to apply to all
applicable plugins. That is, being passed to the plugin initialisation and or
call if such keywords are expected from the plugin. This is a useful alternative
to global or environment variable usage.
- `**node_properties`: Node properties. These will be passed to 'node-aware'
plugins.

Returns:
- The result of executing the plugin function or method.
Expand Down

0 comments on commit d795acc

Please sign in to comment.