Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sanitize decorators and callable parameters on entrypoint load #15932

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,48 @@ def _sanitize_and_load_flow(
)
func_def.returns = None

# Remove flow decorator callback hooks

def get_decorator_function_name(func_decorator):
"""Retrieve decorator function name."""
func_decorator = (
func_decorator.func
if isinstance(func_decorator, ast.Call)
else func_decorator
)
return (
func_decorator.attr
if isinstance(func_decorator, ast.Attribute)
else func_decorator.id
)

if func_def.decorator_list:
# Keep only prefect flow decorator only
func_def.decorator_list = [
func_decorator
for func_decorator in func_def.decorator_list
if get_decorator_function_name(func_decorator) == "flow"
]
exclude_keyword_args = (
"on_failure",
"on_completion",
"on_cancellation",
"on_crashed",
"on_running",
)

# Exclude callable type keyword arguments from flow decorator
for func_decorator in func_def.decorator_list:
if not hasattr(func_decorator, "keywords"):
continue

func_decorator.keywords = list(
filter(
lambda keyword_arg: keyword_arg.arg not in exclude_keyword_args,
func_decorator.keywords,
)
)
Comment on lines +2119 to +2159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if this code were encapsulated in a function for readability. Also, the decorator_list is looped over twice, and the code could be condensed to a single loop instead of filtering out all non-flow decorators first.


# Attempt to compile the function without annotations and defaults that
# can't be compiled
try:
Expand Down
27 changes: 27 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5225,3 +5225,30 @@ def dog():

with pytest.raises(NameError, match="name 'not_a_function' is not defined"):
safe_load_flow_from_entrypoint(entrypoint)()

def test_remove_callback_hooks_for_missing_import(self, tmp_path: Path):
flow_source = dedent(
"""
from prefect import flow

from non_existent import DEFAULT_NAME, DEFAULT_AGE

def test_callback():
print(DEFAULT_NAME)
def wrapper(*args, **kwargs):
pass

return wrapper

@flow(on_running=[test_callback()])
def flow_function(name = DEFAULT_NAME, age = DEFAULT_AGE) -> str:
return name, age
"""
)

tmp_path.joinpath("flow.py").write_text(flow_source)

entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function"

result = safe_load_flow_from_entrypoint(entrypoint)
assert result is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an additional check validating the flow was loaded as expected? You should be able to call the flow and check the return value is (None, None) since the defaults are removed when loading.

Loading