From c0711a340feba4c16cdfc0d5755e3fc4d5977e65 Mon Sep 17 00:00:00 2001 From: Chris Pickett Date: Tue, 9 Jul 2024 11:51:54 -0400 Subject: [PATCH] Fallback to AST parsing if loading flow fails due to a `ScriptError` --- src/prefect/cli/deploy.py | 13 +-- src/prefect/flows.py | 188 +++++++++++++++++++++++++---------- src/prefect/runner/server.py | 2 +- tests/test_flows.py | 66 ++++++++---- 4 files changed, 192 insertions(+), 77 deletions(-) diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index 6d6074fde8a3..d93576583d08 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -57,7 +57,7 @@ from prefect.deployments.steps.core import run_steps from prefect.events import DeploymentTriggerTypes, TriggerTypes from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError -from prefect.flows import load_flow_argument_from_entrypoint +from prefect.flows import load_flow_arguments_from_entrypoint from prefect.settings import ( PREFECT_DEFAULT_WORK_POOL_NAME, PREFECT_UI_URL, @@ -471,10 +471,12 @@ async def _run_single_deploy( ) deploy_config["entrypoint"] = await prompt_entrypoint(app.console) - deploy_config["flow_name"] = load_flow_argument_from_entrypoint( - deploy_config["entrypoint"], arg="name" + flow_decorator_arguments = load_flow_arguments_from_entrypoint( + deploy_config["entrypoint"], arguments={"name", "description"} ) + deploy_config["flow_name"] = flow_decorator_arguments["name"] + deployment_name = deploy_config.get("name") if not deployment_name: if not is_interactive(): @@ -652,9 +654,8 @@ async def _run_single_deploy( deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}" if not deploy_config.get("description"): - deploy_config["description"] = load_flow_argument_from_entrypoint( - deploy_config["entrypoint"], arg="description" - ) + deploy_config["description"] = flow_decorator_arguments.get("description") + # save deploy_config before templating deploy_config_before_templating = deepcopy(deploy_config) ## apply templating from build and push steps to the final deployment spec diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 0825051ca68a..e5683ea902ce 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -28,6 +28,8 @@ List, NoReturn, Optional, + Set, + Tuple, Type, TypeVar, Union, @@ -60,6 +62,7 @@ MissingFlowError, ObjectNotFound, ParameterTypeError, + ScriptError, UnspecifiedFlowError, ) from prefect.filesystems import LocalFileSystem, ReadableDeploymentStorage @@ -1712,6 +1715,14 @@ def load_flow_from_entrypoint( raise MissingFlowError( f"Flow function with name {func_name!r} not found in {path!r}. " ) from exc + except ScriptError as exc: + # If the flow has dependencies that are not installed in the current + # environment, fallback to loading the flow via AST parsing. The + # drawback of this approach is that we're unable to actually load the + # function, so we create a placeholder flow that will re-raise this + # exception when called. + + flow = load_placeholder_flow(entrypoint=entrypoint, raises=exc) if not isinstance(flow, Flow): raise MissingFlowError( @@ -1892,24 +1903,138 @@ async def load_flow_from_flow_run( return flow -def load_flow_argument_from_entrypoint( - entrypoint: str, arg: str = "name" -) -> Optional[str]: +def load_placeholder_flow(entrypoint: str, raises: Exception): """ - Extract a flow argument from an entrypoint string. + Load a placeholder flow that is initialized with the same arguments as the + flow specified in the entrypoint. If called the flow will raise `raises`. - Loads the source code of the entrypoint and extracts the flow argument from the - `flow` decorator. + This is useful when a flow can't be loaded due to missing dependencies or + other issues but the base metadata defining the flow is still needed. Args: - entrypoint: a string in the format `:` or a module path - to a flow function + entrypoint: a string in the format `:` + or a module path to a flow function + raises: an exception to raise when the flow is called + """ + + def _base_placeholder(): + raise raises + + def sync_placeholder_flow(*args, **kwargs): + _base_placeholder() + + async def async_placeholder_flow(*args, **kwargs): + _base_placeholder() + + placeholder_flow = ( + async_placeholder_flow + if is_entrypoint_async(entrypoint) + else sync_placeholder_flow + ) + + arguments = load_flow_arguments_from_entrypoint(entrypoint) + arguments["fn"] = placeholder_flow + + return Flow(**arguments) + + +def load_flow_arguments_from_entrypoint( + entrypoint: str, arguments: Optional[Union[List[str], Set[str]]] = None +) -> dict[str, Any]: + """ + Extract flow arguments from an entrypoint string. + + Loads the source code of the entrypoint and extracts the flow arguments + from the `flow` decorator. + + Args: + entrypoint: a string in the format `:` + or a module path to a flow function + """ + + func_def, source_code = _entrypoint_definition_and_source(entrypoint) + + if arguments is None: + # If no arguments are provided default to known arguments that are of + # built-in types. + arguments = { + "name", + "version", + "retries", + "retry_delay_seconds", + "description", + "timeout_seconds", + "validate_parameters", + "persist_result", + "cache_result_in_memory", + "log_prints", + } + + result = {} + + for decorator in func_def.decorator_list: + if ( + isinstance(decorator, ast.Call) + and getattr(decorator.func, "id", "") == "flow" + ): + for keyword in decorator.keywords: + if keyword.arg not in arguments: + continue + + if isinstance(keyword.value, ast.Constant): + # Use the string value of the argument + result[keyword.arg] = str(keyword.value.value) + continue + + # if the arg value is not a raw str (i.e. a variable or expression), + # then attempt to evaluate it + namespace = safe_load_namespace(source_code) + literal_arg_value = ast.get_source_segment(source_code, keyword.value) + cleaned_value = ( + literal_arg_value.replace("\n", "") if literal_arg_value else "" + ) + + try: + evaluated_value = eval(cleaned_value, namespace) # type: ignore + result[keyword.arg] = str(evaluated_value) + except Exception as e: + logger.info( + "Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.", + keyword.arg, + literal_arg_value, + exc_info=e, + ) + # ignore the decorator arg and fallback to default behavior + continue + + if "name" in arguments and "name" not in result: + # If no matching decorator or keyword argument for `name' is found + # fallback to the function name. + result["name"] = func_def.name.replace("_", "-") + + return result + + +def is_entrypoint_async(entrypoint: str) -> bool: + """ + Determine if the function specified in the entrypoint is asynchronous. + + Args: + entrypoint: A string in the format `:` or + a module path to a function. Returns: - The flow argument value + True if the function is asynchronous, False otherwise. """ + func_def, _ = _entrypoint_definition_and_source(entrypoint) + return isinstance(func_def, ast.AsyncFunctionDef) + + +def _entrypoint_definition_and_source( + entrypoint: str, +) -> Tuple[Union[ast.FunctionDef, ast.AsyncFunctionDef], str]: if ":" in entrypoint: - # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff + # Split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff path, func_name = entrypoint.rsplit(":", maxsplit=1) source_code = Path(path).read_text() else: @@ -1918,6 +2043,7 @@ def load_flow_argument_from_entrypoint( if not spec or not spec.origin: raise ValueError(f"Could not find module {path!r}") source_code = Path(spec.origin).read_text() + parsed_code = ast.parse(source_code) func_def = next( ( @@ -1934,46 +2060,8 @@ def load_flow_argument_from_entrypoint( ), None, ) + if not func_def: raise ValueError(f"Could not find flow {func_name!r} in {path!r}") - for decorator in func_def.decorator_list: - if ( - isinstance(decorator, ast.Call) - and getattr(decorator.func, "id", "") == "flow" - ): - for keyword in decorator.keywords: - if keyword.arg == arg: - if isinstance(keyword.value, ast.Constant): - return ( - keyword.value.value - ) # Return the string value of the argument - - # if the arg value is not a raw str (i.e. a variable or expression), - # then attempt to evaluate it - namespace = safe_load_namespace(source_code) - literal_arg_value = ast.get_source_segment( - source_code, keyword.value - ) - cleaned_value = ( - literal_arg_value.replace("\n", "") if literal_arg_value else "" - ) - - try: - evaluated_value = eval(cleaned_value, namespace) # type: ignore - except Exception as e: - logger.info( - "Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.", - arg, - literal_arg_value, - exc_info=e, - ) - # ignore the decorator arg and fallback to default behavior - break - return str(evaluated_value) - - if arg == "name": - return func_name.replace( - "_", "-" - ) # If no matching decorator or keyword argument is found - return None + return func_def, source_code diff --git a/src/prefect/runner/server.py b/src/prefect/runner/server.py index 018558067a6c..464839487f5c 100644 --- a/src/prefect/runner/server.py +++ b/src/prefect/runner/server.py @@ -202,7 +202,7 @@ async def _create_flow_run_for_flow_from_fqn( try: flow = load_flow_from_entrypoint(body.entrypoint) - except (MissingFlowError, ScriptError, ModuleNotFoundError): + except (FileNotFoundError, MissingFlowError, ScriptError, ModuleNotFoundError): return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={"message": "Flow not found"}, diff --git a/tests/test_flows.py b/tests/test_flows.py index 7ea57d5e3d33..4a0ddbe852de 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -39,11 +39,12 @@ InvalidNameError, ParameterTypeError, ReservedArgumentError, + ScriptError, ) from prefect.filesystems import LocalFileSystem from prefect.flows import ( Flow, - load_flow_argument_from_entrypoint, + load_flow_arguments_from_entrypoint, load_flow_from_entrypoint, load_flow_from_flow_run, ) @@ -2587,6 +2588,31 @@ def pretend_flow(): import_object_mock.assert_called_with("my.module.pretend_flow") +def test_load_flow_from_entrypoint_script_error_loads_placeholder(tmp_path): + flow_code = """ + from not_a_module import not_a_function + from prefect import flow + + @flow(description="Says woof!") + def dog(): + return "woof!" + """ + fpath = tmp_path / "f.py" + fpath.write_text(dedent(flow_code)) + + flow = load_flow_from_entrypoint(f"{fpath}:dog") + + # Since `not_a_module` isn't a real module, loading the flow as python + # should fail, and `load_flow_from_entrypoint` should fallback to + # returning a placeholder flow with the correct name, description, etc. + assert flow.name == "dog" + assert flow.description == "Says woof!" + + # But if the flow is called, it should raise the ScriptError + with pytest.raises(ScriptError): + flow.fn() + + @pytest.mark.skip(reason="Fails with new engine, passed on old engine") async def test_handling_script_with_unprotected_call_in_flow_script( tmp_path, @@ -4469,9 +4495,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "My custom name" + assert result["name"] == "My custom name" def test_load_flow_name_from_entrypoint_no_name(self, tmp_path: Path): flow_source = dedent( @@ -4489,9 +4515,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "flow-function" + assert result["name"] == "flow-function" def test_load_flow_name_from_entrypoint_dynamic_name_fstring(self, tmp_path: Path): flow_source = dedent( @@ -4511,9 +4537,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "flow-function-1.0" + assert result["name"] == "flow-function-1.0" def test_load_flow_name_from_entrypoint_dyanmic_name_function(self, tmp_path: Path): flow_source = dedent( @@ -4534,9 +4560,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "from-a-function" + assert result["name"] == "from-a-function" def test_load_flow_name_from_entrypoint_dynamic_name_depends_on_missing_import( self, tmp_path: Path, caplog: pytest.LogCaptureFixture @@ -4558,9 +4584,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "flow-function" + assert result["name"] == "flow-function" assert "Failed to parse @flow argument: `name=get_name()`" in caplog.text def test_load_flow_name_from_entrypoint_dynamic_name_fstring_multiline( @@ -4589,9 +4615,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "flow-function-1.0" + assert result["name"] == "flow-function-1.0" def test_load_async_flow_from_entrypoint_no_name(self, tmp_path: Path): flow_source = dedent( @@ -4608,9 +4634,9 @@ async def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "name") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "flow-function" + assert result["name"] == "flow-function" def test_load_flow_description_from_entrypoint(self, tmp_path: Path): flow_source = dedent( @@ -4628,9 +4654,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "description") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result == "My custom description" + assert result["description"] == "My custom description" def test_load_flow_description_from_entrypoint_no_description(self, tmp_path: Path): flow_source = dedent( @@ -4648,9 +4674,9 @@ def flow_function(name: str) -> str: entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" - result = load_flow_argument_from_entrypoint(entrypoint, "description") + result = load_flow_arguments_from_entrypoint(entrypoint) - assert result is None + assert "description" not in result def test_load_no_flow(self, tmp_path: Path): flow_source = dedent( @@ -4665,4 +4691,4 @@ def test_load_no_flow(self, tmp_path: Path): entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" with pytest.raises(ValueError, match="Could not find flow"): - load_flow_argument_from_entrypoint(entrypoint, "name") + load_flow_arguments_from_entrypoint(entrypoint)