Skip to content

Commit

Permalink
Allow loading flows from an entrypoint when dependencies are missing (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald authored Jul 9, 2024
1 parent a9fe238 commit 1d00d9e
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 77 deletions.
13 changes: 7 additions & 6 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from prefect.deployments.steps.core import run_steps
from prefect.events import DeploymentTriggerTypes, TriggerTypes
from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError
from prefect.flows import load_flow_argument_from_entrypoint
from prefect.flows import load_flow_arguments_from_entrypoint
from prefect.settings import (
PREFECT_DEFAULT_WORK_POOL_NAME,
PREFECT_UI_URL,
Expand Down Expand Up @@ -471,10 +471,12 @@ async def _run_single_deploy(
)
deploy_config["entrypoint"] = await prompt_entrypoint(app.console)

deploy_config["flow_name"] = load_flow_argument_from_entrypoint(
deploy_config["entrypoint"], arg="name"
flow_decorator_arguments = load_flow_arguments_from_entrypoint(
deploy_config["entrypoint"], arguments={"name", "description"}
)

deploy_config["flow_name"] = flow_decorator_arguments["name"]

deployment_name = deploy_config.get("name")
if not deployment_name:
if not is_interactive():
Expand Down Expand Up @@ -652,9 +654,8 @@ async def _run_single_deploy(
deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}"

if not deploy_config.get("description"):
deploy_config["description"] = load_flow_argument_from_entrypoint(
deploy_config["entrypoint"], arg="description"
)
deploy_config["description"] = flow_decorator_arguments.get("description")

# save deploy_config before templating
deploy_config_before_templating = deepcopy(deploy_config)
## apply templating from build and push steps to the final deployment spec
Expand Down
188 changes: 138 additions & 50 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
List,
NoReturn,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -60,6 +62,7 @@
MissingFlowError,
ObjectNotFound,
ParameterTypeError,
ScriptError,
UnspecifiedFlowError,
)
from prefect.filesystems import LocalFileSystem, ReadableDeploymentStorage
Expand Down Expand Up @@ -1712,6 +1715,14 @@ def load_flow_from_entrypoint(
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc
except ScriptError as exc:
# If the flow has dependencies that are not installed in the current
# environment, fallback to loading the flow via AST parsing. The
# drawback of this approach is that we're unable to actually load the
# function, so we create a placeholder flow that will re-raise this
# exception when called.

flow = load_placeholder_flow(entrypoint=entrypoint, raises=exc)

if not isinstance(flow, Flow):
raise MissingFlowError(
Expand Down Expand Up @@ -1892,24 +1903,138 @@ async def load_flow_from_flow_run(
return flow


def load_flow_argument_from_entrypoint(
entrypoint: str, arg: str = "name"
) -> Optional[str]:
def load_placeholder_flow(entrypoint: str, raises: Exception):
"""
Extract a flow argument from an entrypoint string.
Load a placeholder flow that is initialized with the same arguments as the
flow specified in the entrypoint. If called the flow will raise `raises`.
Loads the source code of the entrypoint and extracts the flow argument from the
`flow` decorator.
This is useful when a flow can't be loaded due to missing dependencies or
other issues but the base metadata defining the flow is still needed.
Args:
entrypoint: a string in the format `<path_to_script>:<flow_func_name>` or a module path
to a flow function
entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
or a module path to a flow function
raises: an exception to raise when the flow is called
"""

def _base_placeholder():
raise raises

def sync_placeholder_flow(*args, **kwargs):
_base_placeholder()

async def async_placeholder_flow(*args, **kwargs):
_base_placeholder()

placeholder_flow = (
async_placeholder_flow
if is_entrypoint_async(entrypoint)
else sync_placeholder_flow
)

arguments = load_flow_arguments_from_entrypoint(entrypoint)
arguments["fn"] = placeholder_flow

return Flow(**arguments)


def load_flow_arguments_from_entrypoint(
entrypoint: str, arguments: Optional[Union[List[str], Set[str]]] = None
) -> dict[str, Any]:
"""
Extract flow arguments from an entrypoint string.
Loads the source code of the entrypoint and extracts the flow arguments
from the `flow` decorator.
Args:
entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
or a module path to a flow function
"""

func_def, source_code = _entrypoint_definition_and_source(entrypoint)

if arguments is None:
# If no arguments are provided default to known arguments that are of
# built-in types.
arguments = {
"name",
"version",
"retries",
"retry_delay_seconds",
"description",
"timeout_seconds",
"validate_parameters",
"persist_result",
"cache_result_in_memory",
"log_prints",
}

result = {}

for decorator in func_def.decorator_list:
if (
isinstance(decorator, ast.Call)
and getattr(decorator.func, "id", "") == "flow"
):
for keyword in decorator.keywords:
if keyword.arg not in arguments:
continue

if isinstance(keyword.value, ast.Constant):
# Use the string value of the argument
result[keyword.arg] = str(keyword.value.value)
continue

# if the arg value is not a raw str (i.e. a variable or expression),
# then attempt to evaluate it
namespace = safe_load_namespace(source_code)
literal_arg_value = ast.get_source_segment(source_code, keyword.value)
cleaned_value = (
literal_arg_value.replace("\n", "") if literal_arg_value else ""
)

try:
evaluated_value = eval(cleaned_value, namespace) # type: ignore
result[keyword.arg] = str(evaluated_value)
except Exception as e:
logger.info(
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
keyword.arg,
literal_arg_value,
exc_info=e,
)
# ignore the decorator arg and fallback to default behavior
continue

if "name" in arguments and "name" not in result:
# If no matching decorator or keyword argument for `name' is found
# fallback to the function name.
result["name"] = func_def.name.replace("_", "-")

return result


def is_entrypoint_async(entrypoint: str) -> bool:
"""
Determine if the function specified in the entrypoint is asynchronous.
Args:
entrypoint: A string in the format `<path_to_script>:<func_name>` or
a module path to a function.
Returns:
The flow argument value
True if the function is asynchronous, False otherwise.
"""
func_def, _ = _entrypoint_definition_and_source(entrypoint)
return isinstance(func_def, ast.AsyncFunctionDef)


def _entrypoint_definition_and_source(
entrypoint: str,
) -> Tuple[Union[ast.FunctionDef, ast.AsyncFunctionDef], str]:
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
# Split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
source_code = Path(path).read_text()
else:
Expand All @@ -1918,6 +2043,7 @@ def load_flow_argument_from_entrypoint(
if not spec or not spec.origin:
raise ValueError(f"Could not find module {path!r}")
source_code = Path(spec.origin).read_text()

parsed_code = ast.parse(source_code)
func_def = next(
(
Expand All @@ -1934,46 +2060,8 @@ def load_flow_argument_from_entrypoint(
),
None,
)

if not func_def:
raise ValueError(f"Could not find flow {func_name!r} in {path!r}")
for decorator in func_def.decorator_list:
if (
isinstance(decorator, ast.Call)
and getattr(decorator.func, "id", "") == "flow"
):
for keyword in decorator.keywords:
if keyword.arg == arg:
if isinstance(keyword.value, ast.Constant):
return (
keyword.value.value
) # Return the string value of the argument

# if the arg value is not a raw str (i.e. a variable or expression),
# then attempt to evaluate it
namespace = safe_load_namespace(source_code)
literal_arg_value = ast.get_source_segment(
source_code, keyword.value
)
cleaned_value = (
literal_arg_value.replace("\n", "") if literal_arg_value else ""
)

try:
evaluated_value = eval(cleaned_value, namespace) # type: ignore
except Exception as e:
logger.info(
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
arg,
literal_arg_value,
exc_info=e,
)
# ignore the decorator arg and fallback to default behavior
break
return str(evaluated_value)

if arg == "name":
return func_name.replace(
"_", "-"
) # If no matching decorator or keyword argument is found

return None
return func_def, source_code
2 changes: 1 addition & 1 deletion src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async def _create_flow_run_for_flow_from_fqn(

try:
flow = load_flow_from_entrypoint(body.entrypoint)
except (MissingFlowError, ScriptError, ModuleNotFoundError):
except (FileNotFoundError, MissingFlowError, ScriptError, ModuleNotFoundError):
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"message": "Flow not found"},
Expand Down
Loading

0 comments on commit 1d00d9e

Please sign in to comment.