Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow loading flows from an entrypoint when dependencies are missing #14530

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from prefect.deployments.steps.core import run_steps
from prefect.events import DeploymentTriggerTypes, TriggerTypes
from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError
from prefect.flows import load_flow_argument_from_entrypoint
from prefect.flows import load_flow_arguments_from_entrypoint
from prefect.settings import (
PREFECT_DEFAULT_WORK_POOL_NAME,
PREFECT_UI_URL,
Expand Down Expand Up @@ -471,10 +471,12 @@ async def _run_single_deploy(
)
deploy_config["entrypoint"] = await prompt_entrypoint(app.console)

deploy_config["flow_name"] = load_flow_argument_from_entrypoint(
deploy_config["entrypoint"], arg="name"
flow_decorator_arguments = load_flow_arguments_from_entrypoint(
deploy_config["entrypoint"], arguments={"name", "description"}
)

deploy_config["flow_name"] = flow_decorator_arguments["name"]

deployment_name = deploy_config.get("name")
if not deployment_name:
if not is_interactive():
Expand Down Expand Up @@ -652,9 +654,8 @@ async def _run_single_deploy(
deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}"

if not deploy_config.get("description"):
deploy_config["description"] = load_flow_argument_from_entrypoint(
deploy_config["entrypoint"], arg="description"
)
deploy_config["description"] = flow_decorator_arguments.get("description")

# save deploy_config before templating
deploy_config_before_templating = deepcopy(deploy_config)
## apply templating from build and push steps to the final deployment spec
Expand Down
188 changes: 138 additions & 50 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
List,
NoReturn,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -60,6 +62,7 @@
MissingFlowError,
ObjectNotFound,
ParameterTypeError,
ScriptError,
UnspecifiedFlowError,
)
from prefect.filesystems import LocalFileSystem, ReadableDeploymentStorage
Expand Down Expand Up @@ -1712,6 +1715,14 @@ def load_flow_from_entrypoint(
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc
except ScriptError as exc:
# If the flow has dependencies that are not installed in the current
# environment, fallback to loading the flow via AST parsing. The
# drawback of this approach is that we're unable to actually load the
# function, so we create a placeholder flow that will re-raise this
# exception when called.

flow = load_placeholder_flow(entrypoint=entrypoint, raises=exc)

if not isinstance(flow, Flow):
raise MissingFlowError(
Expand Down Expand Up @@ -1892,24 +1903,138 @@ async def load_flow_from_flow_run(
return flow


def load_flow_argument_from_entrypoint(
entrypoint: str, arg: str = "name"
) -> Optional[str]:
def load_placeholder_flow(entrypoint: str, raises: Exception):
"""
Extract a flow argument from an entrypoint string.
Load a placeholder flow that is initialized with the same arguments as the
flow specified in the entrypoint. If called the flow will raise `raises`.

Loads the source code of the entrypoint and extracts the flow argument from the
`flow` decorator.
This is useful when a flow can't be loaded due to missing dependencies or
other issues but the base metadata defining the flow is still needed.

Args:
entrypoint: a string in the format `<path_to_script>:<flow_func_name>` or a module path
to a flow function
entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
or a module path to a flow function
raises: an exception to raise when the flow is called
"""

def _base_placeholder():
raise raises

def sync_placeholder_flow(*args, **kwargs):
_base_placeholder()

async def async_placeholder_flow(*args, **kwargs):
_base_placeholder()

placeholder_flow = (
async_placeholder_flow
if is_entrypoint_async(entrypoint)
else sync_placeholder_flow
)

arguments = load_flow_arguments_from_entrypoint(entrypoint)
arguments["fn"] = placeholder_flow

return Flow(**arguments)


def load_flow_arguments_from_entrypoint(
entrypoint: str, arguments: Optional[Union[List[str], Set[str]]] = None
) -> dict[str, Any]:
"""
Extract flow arguments from an entrypoint string.

Loads the source code of the entrypoint and extracts the flow arguments
from the `flow` decorator.

Args:
entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
or a module path to a flow function
"""

func_def, source_code = _entrypoint_definition_and_source(entrypoint)

if arguments is None:
# If no arguments are provided default to known arguments that are of
# built-in types.
arguments = {
"name",
"version",
"retries",
"retry_delay_seconds",
"description",
"timeout_seconds",
"validate_parameters",
"persist_result",
"cache_result_in_memory",
"log_prints",
}

result = {}

for decorator in func_def.decorator_list:
if (
isinstance(decorator, ast.Call)
and getattr(decorator.func, "id", "") == "flow"
):
for keyword in decorator.keywords:
if keyword.arg not in arguments:
continue

if isinstance(keyword.value, ast.Constant):
# Use the string value of the argument
result[keyword.arg] = str(keyword.value.value)
continue

# if the arg value is not a raw str (i.e. a variable or expression),
# then attempt to evaluate it
namespace = safe_load_namespace(source_code)
literal_arg_value = ast.get_source_segment(source_code, keyword.value)
cleaned_value = (
literal_arg_value.replace("\n", "") if literal_arg_value else ""
)

try:
evaluated_value = eval(cleaned_value, namespace) # type: ignore
result[keyword.arg] = str(evaluated_value)
except Exception as e:
logger.info(
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
keyword.arg,
literal_arg_value,
exc_info=e,
)
# ignore the decorator arg and fallback to default behavior
continue

if "name" in arguments and "name" not in result:
# If no matching decorator or keyword argument for `name' is found
# fallback to the function name.
result["name"] = func_def.name.replace("_", "-")

return result


def is_entrypoint_async(entrypoint: str) -> bool:
"""
Determine if the function specified in the entrypoint is asynchronous.

Args:
entrypoint: A string in the format `<path_to_script>:<func_name>` or
a module path to a function.

Returns:
The flow argument value
True if the function is asynchronous, False otherwise.
"""
func_def, _ = _entrypoint_definition_and_source(entrypoint)
return isinstance(func_def, ast.AsyncFunctionDef)


def _entrypoint_definition_and_source(
entrypoint: str,
) -> Tuple[Union[ast.FunctionDef, ast.AsyncFunctionDef], str]:
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
# Split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
source_code = Path(path).read_text()
else:
Expand All @@ -1918,6 +2043,7 @@ def load_flow_argument_from_entrypoint(
if not spec or not spec.origin:
raise ValueError(f"Could not find module {path!r}")
source_code = Path(spec.origin).read_text()

parsed_code = ast.parse(source_code)
func_def = next(
(
Expand All @@ -1934,46 +2060,8 @@ def load_flow_argument_from_entrypoint(
),
None,
)

if not func_def:
raise ValueError(f"Could not find flow {func_name!r} in {path!r}")
for decorator in func_def.decorator_list:
if (
isinstance(decorator, ast.Call)
and getattr(decorator.func, "id", "") == "flow"
):
for keyword in decorator.keywords:
if keyword.arg == arg:
if isinstance(keyword.value, ast.Constant):
return (
keyword.value.value
) # Return the string value of the argument

# if the arg value is not a raw str (i.e. a variable or expression),
# then attempt to evaluate it
namespace = safe_load_namespace(source_code)
literal_arg_value = ast.get_source_segment(
source_code, keyword.value
)
cleaned_value = (
literal_arg_value.replace("\n", "") if literal_arg_value else ""
)

try:
evaluated_value = eval(cleaned_value, namespace) # type: ignore
except Exception as e:
logger.info(
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
arg,
literal_arg_value,
exc_info=e,
)
# ignore the decorator arg and fallback to default behavior
break
return str(evaluated_value)

if arg == "name":
return func_name.replace(
"_", "-"
) # If no matching decorator or keyword argument is found

return None
return func_def, source_code
2 changes: 1 addition & 1 deletion src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async def _create_flow_run_for_flow_from_fqn(

try:
flow = load_flow_from_entrypoint(body.entrypoint)
except (MissingFlowError, ScriptError, ModuleNotFoundError):
except (FileNotFoundError, MissingFlowError, ScriptError, ModuleNotFoundError):
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"message": "Flow not found"},
Expand Down
Loading
Loading