From 65963a356a67d297802e7f9203fa0e526f8766b0 Mon Sep 17 00:00:00 2001 From: Martijn Pieters Date: Sun, 15 Dec 2024 03:48:02 +0000 Subject: [PATCH] Refactor prefect.flow decorator This makes the `flow.from_source()` callable available to the type checker. A new tests/typesafety suite provides coverage for the type annotations here. --- .github/workflows/python-tests.yaml | 41 ++- requirements-dev.txt | 1 + setup.cfg | 2 +- src/prefect/flows.py | 409 +++++++++++++++------------- tests/test_flows.py | 2 +- tests/typesafety/test_flows.yml | 37 +++ 6 files changed, 305 insertions(+), 187 deletions(-) create mode 100644 tests/typesafety/test_flows.yml diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index 33af87e3b9f46..27ec693936641 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -58,7 +58,7 @@ jobs: - name: Server Tests modules: tests/server/ tests/events/server - name: Client Tests - modules: tests/ --ignore=tests/server/ --ignore=tests/events/server --ignore=tests/test_task_runners.py --ignore=tests/runner --ignore=tests/workers + modules: tests/ --ignore=tests/typesafety --ignore=tests/server/ --ignore=tests/events/server --ignore=tests/test_task_runners.py --ignore=tests/runner --ignore=tests/workers - name: Runner and Worker Tests modules: tests/test_task_runners.py tests/runner tests/workers database: @@ -364,7 +364,7 @@ jobs: - name: Run tests run: | echo "Using COVERAGE_FILE=$COVERAGE_FILE" - pytest tests \ + pytest tests --ignore=tests/typesafety \ --numprocesses auto \ --maxprocesses 6 \ --dist worksteal \ @@ -449,3 +449,40 @@ jobs: echo "## Coverage Report" >> $GITHUB_STEP_SUMMARY echo "[Detailed Report](${{ steps.upload_combined_coverage_report.outputs.artifact_url }})" >> $GITHUB_STEP_SUMMARY coverage report --format=markdown >> $GITHUB_STEP_SUMMARY + + run-typesafety-test: + name: typesafety + runs-on: ubuntu-latest + + steps: + + - uses: actions/checkout@v4 + with: + persist-credentials: false + fetch-depth: 0 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + id: setup_python + with: + python-version: 3.12 + + - name: UV Cache + # Manually cache the uv cache directory + # until setup-python supports it: + # https://github.com/actions/setup-python/issues/822 + uses: actions/cache@v4 + id: cache-uv + with: + path: ~/.cache/uv + key: uvcache-${{ runner.os }}-${{ steps.setup_python.outputs.python-version }}-${{ hashFiles('requirements-client.txt', 'requirements.txt', 'requirements-dev.txt') }} + + - name: Install packages + run: | + python -m pip install -U uv + uv pip install --upgrade --system -e .[dev] + + - name: Run tests + run: | + pytest tests/typesafety \ + --disable-docker-image-builds diff --git a/requirements-dev.txt b/requirements-dev.txt index 3a1439fc2da83..1d7e83c06092c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,6 +15,7 @@ pytest-benchmark pytest-cov pytest-env pytest-flakefinder +pytest-mypy-plugins >= 3.1.0 pytest-timeout pytest-xdist >= 3.6.1 pyyaml diff --git a/setup.cfg b/setup.cfg index f32eb182605d5..c19bd5f1d01b4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [tool:pytest] testpaths = tests -addopts = -rfEs +addopts = -rfEs --mypy-only-local-stub norecursedirs = *.egg-info .git .mypy_cache node_modules .pytest_cache .vscode python_files = diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 4f402499a94ce..bab71b5eaec1f 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -924,10 +924,10 @@ def my_flow(name): @classmethod @sync_compatible async def from_source( - cls: Type["Flow[P, R]"], + cls, source: Union[str, "RunnerStorage", ReadableDeploymentStorage], entrypoint: str, - ) -> "Flow[P, R]": + ) -> "Flow[..., Any]": """ Loads a flow from a remote source. @@ -1025,7 +1025,7 @@ def my_flow(name: str = "world"): full_entrypoint = str(storage.destination / entrypoint) flow = cast( - Flow[P, R], + "Flow[..., Any]", await from_async.wait_for_call_in_new_thread( create_call(load_flow_from_entrypoint, full_entrypoint) ), @@ -1408,194 +1408,200 @@ async def visualize(self, *args: "P.args", **kwargs: "P.kwargs"): raise new_exception -@overload -def flow(__fn: Callable[P, R]) -> Flow[P, R]: - ... - - -@overload -def flow( - *, - name: Optional[str] = None, - version: Optional[str] = None, - flow_run_name: Optional[Union[Callable[[], str], str]] = None, - retries: Optional[int] = None, - retry_delay_seconds: Optional[Union[int, float]] = None, - task_runner: Optional[TaskRunner[PrefectFuture[R]]] = None, - description: Optional[str] = None, - timeout_seconds: Union[int, float, None] = None, - validate_parameters: bool = True, - persist_result: Optional[bool] = None, - result_storage: Optional[ResultStorage] = None, - result_serializer: Optional[ResultSerializer] = None, - cache_result_in_memory: bool = True, - log_prints: Optional[bool] = None, - on_completion: Optional[list[StateHookCallable]] = None, - on_failure: Optional[list[StateHookCallable]] = None, - on_cancellation: Optional[list[StateHookCallable]] = None, - on_crashed: Optional[list[StateHookCallable]] = None, - on_running: Optional[list[StateHookCallable]] = None, -) -> Callable[[Callable[P, R]], Flow[P, R]]: - ... - - -def flow( - __fn: Optional[Callable[P, R]] = None, - *, - name: Optional[str] = None, - version: Optional[str] = None, - flow_run_name: Optional[Union[Callable[[], str], str]] = None, - retries: Optional[int] = None, - retry_delay_seconds: Union[int, float, None] = None, - task_runner: Optional[TaskRunner[PrefectFuture[R]]] = None, - description: Optional[str] = None, - timeout_seconds: Union[int, float, None] = None, - validate_parameters: bool = True, - persist_result: Optional[bool] = None, - result_storage: Optional[ResultStorage] = None, - result_serializer: Optional[ResultSerializer] = None, - cache_result_in_memory: bool = True, - log_prints: Optional[bool] = None, - on_completion: Optional[list[StateHookCallable]] = None, - on_failure: Optional[list[StateHookCallable]] = None, - on_cancellation: Optional[list[StateHookCallable]] = None, - on_crashed: Optional[list[StateHookCallable]] = None, - on_running: Optional[list[StateHookCallable]] = None, -): - """ - Decorator to designate a function as a Prefect workflow. +class FlowDecorator: + @overload + def __call__(self, __fn: Callable[P, R]) -> Flow[P, R]: + ... - This decorator may be used for asynchronous or synchronous functions. + @overload + def __call__( + self, + __fn: None = None, + *, + name: Optional[str] = None, + version: Optional[str] = None, + flow_run_name: Optional[Union[Callable[[], str], str]] = None, + retries: Optional[int] = None, + retry_delay_seconds: Optional[Union[int, float]] = None, + task_runner: None = None, + description: Optional[str] = None, + timeout_seconds: Union[int, float, None] = None, + validate_parameters: bool = True, + persist_result: Optional[bool] = None, + result_storage: Optional[ResultStorage] = None, + result_serializer: Optional[ResultSerializer] = None, + cache_result_in_memory: bool = True, + log_prints: Optional[bool] = None, + on_completion: Optional[list[StateHookCallable]] = None, + on_failure: Optional[list[StateHookCallable]] = None, + on_cancellation: Optional[list[StateHookCallable]] = None, + on_crashed: Optional[list[StateHookCallable]] = None, + on_running: Optional[list[StateHookCallable]] = None, + ) -> Callable[[Callable[P, R]], Flow[P, R]]: + ... - Flow parameters must be serializable by Pydantic. + @overload + def __call__( + self, + __fn: None = None, + *, + name: Optional[str] = None, + version: Optional[str] = None, + flow_run_name: Optional[Union[Callable[[], str], str]] = None, + retries: Optional[int] = None, + retry_delay_seconds: Optional[Union[int, float]] = None, + task_runner: Optional[TaskRunner[PrefectFuture[R]]] = None, + description: Optional[str] = None, + timeout_seconds: Union[int, float, None] = None, + validate_parameters: bool = True, + persist_result: Optional[bool] = None, + result_storage: Optional[ResultStorage] = None, + result_serializer: Optional[ResultSerializer] = None, + cache_result_in_memory: bool = True, + log_prints: Optional[bool] = None, + on_completion: Optional[list[StateHookCallable]] = None, + on_failure: Optional[list[StateHookCallable]] = None, + on_cancellation: Optional[list[StateHookCallable]] = None, + on_crashed: Optional[list[StateHookCallable]] = None, + on_running: Optional[list[StateHookCallable]] = None, + ) -> Callable[[Callable[P, R]], Flow[P, R]]: + ... - Args: - name: An optional name for the flow; if not provided, the name will be inferred - from the given function. - version: An optional version string for the flow; if not provided, we will - attempt to create a version string as a hash of the file containing the - wrapped function; if the file cannot be located, the version will be null. - flow_run_name: An optional name to distinguish runs of this flow; this name can - be provided as a string template with the flow's parameters as variables, - or a function that returns a string. - retries: An optional number of times to retry on flow run failure. - retry_delay_seconds: An optional number of seconds to wait before retrying the - flow after failure. This is only applicable if `retries` is nonzero. - task_runner: An optional task runner to use for task execution within the flow; if - not provided, a `ConcurrentTaskRunner` will be instantiated. - description: An optional string description for the flow; if not provided, the - description will be pulled from the docstring for the decorated function. - timeout_seconds: An optional number of seconds indicating a maximum runtime for - the flow. If the flow exceeds this runtime, it will be marked as failed. - Flow execution may continue until the next task is called. - validate_parameters: By default, parameters passed to flows are validated by - Pydantic. This will check that input values conform to the annotated types - on the function. Where possible, values will be coerced into the correct - type; for example, if a parameter is defined as `x: int` and "5" is passed, - it will be resolved to `5`. If set to `False`, no validation will be - performed on flow parameters. - persist_result: An optional toggle indicating whether the result of this flow - should be persisted to result storage. Defaults to `None`, which indicates - that Prefect should choose whether the result should be persisted depending on - the features being used. - result_storage: An optional block to use to persist the result of this flow. - This value will be used as the default for any tasks in this flow. - If not provided, the local file system will be used unless called as - a subflow, at which point the default will be loaded from the parent flow. - result_serializer: An optional serializer to use to serialize the result of this - flow for persistence. This value will be used as the default for any tasks - in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER` - will be used unless called as a subflow, at which point the default will be - loaded from the parent flow. - cache_result_in_memory: An optional toggle indicating whether the cached result of - a running the flow should be stored in memory. Defaults to `True`. - log_prints: If set, `print` statements in the flow will be redirected to the - Prefect logger for the flow run. Defaults to `None`, which indicates that - the value from the parent flow should be used. If this is a parent flow, - the default is pulled from the `PREFECT_LOGGING_LOG_PRINTS` setting. - on_completion: An optional list of functions to call when the flow run is - completed. Each function should accept three arguments: the flow, the flow - run, and the final state of the flow run. - on_failure: An optional list of functions to call when the flow run fails. Each - function should accept three arguments: the flow, the flow run, and the - final state of the flow run. - on_cancellation: An optional list of functions to call when the flow run is - cancelled. These functions will be passed the flow, flow run, and final state. - on_crashed: An optional list of functions to call when the flow run crashes. Each - function should accept three arguments: the flow, the flow run, and the - final state of the flow run. - on_running: An optional list of functions to call when the flow run is started. Each - function should accept three arguments: the flow, the flow run, and the current state + def __call__( + self, + __fn: Optional[Callable[P, R]] = None, + *, + name: Optional[str] = None, + version: Optional[str] = None, + flow_run_name: Optional[Union[Callable[[], str], str]] = None, + retries: Optional[int] = None, + retry_delay_seconds: Union[int, float, None] = None, + task_runner: Optional[TaskRunner[PrefectFuture[R]]] = None, + description: Optional[str] = None, + timeout_seconds: Union[int, float, None] = None, + validate_parameters: bool = True, + persist_result: Optional[bool] = None, + result_storage: Optional[ResultStorage] = None, + result_serializer: Optional[ResultSerializer] = None, + cache_result_in_memory: bool = True, + log_prints: Optional[bool] = None, + on_completion: Optional[list[StateHookCallable]] = None, + on_failure: Optional[list[StateHookCallable]] = None, + on_cancellation: Optional[list[StateHookCallable]] = None, + on_crashed: Optional[list[StateHookCallable]] = None, + on_running: Optional[list[StateHookCallable]] = None, + ) -> Union[Flow[P, R], Callable[[Callable[P, R]], Flow[P, R]]]: + """ + Decorator to designate a function as a Prefect workflow. - Returns: - A callable `Flow` object which, when called, will run the flow and return its - final state. + This decorator may be used for asynchronous or synchronous functions. - Examples: - Define a simple flow + Flow parameters must be serializable by Pydantic. - >>> from prefect import flow - >>> @flow - >>> def add(x, y): - >>> return x + y + Args: + name: An optional name for the flow; if not provided, the name will be inferred + from the given function. + version: An optional version string for the flow; if not provided, we will + attempt to create a version string as a hash of the file containing the + wrapped function; if the file cannot be located, the version will be null. + flow_run_name: An optional name to distinguish runs of this flow; this name can + be provided as a string template with the flow's parameters as variables, + or a function that returns a string. + retries: An optional number of times to retry on flow run failure. + retry_delay_seconds: An optional number of seconds to wait before retrying the + flow after failure. This is only applicable if `retries` is nonzero. + task_runner: An optional task runner to use for task execution within the flow; if + not provided, a `ConcurrentTaskRunner` will be instantiated. + description: An optional string description for the flow; if not provided, the + description will be pulled from the docstring for the decorated function. + timeout_seconds: An optional number of seconds indicating a maximum runtime for + the flow. If the flow exceeds this runtime, it will be marked as failed. + Flow execution may continue until the next task is called. + validate_parameters: By default, parameters passed to flows are validated by + Pydantic. This will check that input values conform to the annotated types + on the function. Where possible, values will be coerced into the correct + type; for example, if a parameter is defined as `x: int` and "5" is passed, + it will be resolved to `5`. If set to `False`, no validation will be + performed on flow parameters. + persist_result: An optional toggle indicating whether the result of this flow + should be persisted to result storage. Defaults to `None`, which indicates + that Prefect should choose whether the result should be persisted depending on + the features being used. + result_storage: An optional block to use to persist the result of this flow. + This value will be used as the default for any tasks in this flow. + If not provided, the local file system will be used unless called as + a subflow, at which point the default will be loaded from the parent flow. + result_serializer: An optional serializer to use to serialize the result of this + flow for persistence. This value will be used as the default for any tasks + in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER` + will be used unless called as a subflow, at which point the default will be + loaded from the parent flow. + cache_result_in_memory: An optional toggle indicating whether the cached result of + a running the flow should be stored in memory. Defaults to `True`. + log_prints: If set, `print` statements in the flow will be redirected to the + Prefect logger for the flow run. Defaults to `None`, which indicates that + the value from the parent flow should be used. If this is a parent flow, + the default is pulled from the `PREFECT_LOGGING_LOG_PRINTS` setting. + on_completion: An optional list of functions to call when the flow run is + completed. Each function should accept three arguments: the flow, the flow + run, and the final state of the flow run. + on_failure: An optional list of functions to call when the flow run fails. Each + function should accept three arguments: the flow, the flow run, and the + final state of the flow run. + on_cancellation: An optional list of functions to call when the flow run is + cancelled. These functions will be passed the flow, flow run, and final state. + on_crashed: An optional list of functions to call when the flow run crashes. Each + function should accept three arguments: the flow, the flow run, and the + final state of the flow run. + on_running: An optional list of functions to call when the flow run is started. Each + function should accept three arguments: the flow, the flow run, and the current state - Define an async flow + Returns: + A callable `Flow` object which, when called, will run the flow and return its + final state. - >>> @flow - >>> async def add(x, y): - >>> return x + y + Examples: + Define a simple flow - Define a flow with a version and description + >>> from prefect import flow + >>> @flow + >>> def add(x, y): + >>> return x + y - >>> @flow(version="first-flow", description="This flow is empty!") - >>> def my_flow(): - >>> pass + Define an async flow - Define a flow with a custom name + >>> @flow + >>> async def add(x, y): + >>> return x + y - >>> @flow(name="The Ultimate Flow") - >>> def my_flow(): - >>> pass + Define a flow with a version and description - Define a flow that submits its tasks to dask + >>> @flow(version="first-flow", description="This flow is empty!") + >>> def my_flow(): + >>> pass - >>> from prefect_dask.task_runners import DaskTaskRunner - >>> - >>> @flow(task_runner=DaskTaskRunner) - >>> def my_flow(): - >>> pass - """ - if __fn: - if isinstance(__fn, (classmethod, staticmethod)): - method_decorator = type(__fn).__name__ - raise TypeError(f"@{method_decorator} should be applied on top of @flow") - return Flow( - fn=__fn, - name=name, - version=version, - flow_run_name=flow_run_name, - task_runner=task_runner, - description=description, - timeout_seconds=timeout_seconds, - validate_parameters=validate_parameters, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - persist_result=persist_result, - result_storage=result_storage, - result_serializer=result_serializer, - cache_result_in_memory=cache_result_in_memory, - log_prints=log_prints, - on_completion=on_completion, - on_failure=on_failure, - on_cancellation=on_cancellation, - on_crashed=on_crashed, - on_running=on_running, - ) - else: - return cast( - Callable[[Callable[P, R]], Flow[P, R]], - partial( - flow, + Define a flow with a custom name + + >>> @flow(name="The Ultimate Flow") + >>> def my_flow(): + >>> pass + + Define a flow that submits its tasks to dask + + >>> from prefect_dask.task_runners import DaskTaskRunner + >>> + >>> @flow(task_runner=DaskTaskRunner) + >>> def my_flow(): + >>> pass + """ + if __fn: + if isinstance(__fn, (classmethod, staticmethod)): + method_decorator = type(__fn).__name__ + raise TypeError( + f"@{method_decorator} should be applied on top of @flow" + ) + return Flow( + fn=__fn, name=name, version=version, flow_run_name=flow_run_name, @@ -1615,8 +1621,49 @@ def flow( on_cancellation=on_cancellation, on_crashed=on_crashed, on_running=on_running, - ), - ) + ) + else: + return cast( + Callable[[Callable[P, R]], Flow[P, R]], + partial( + flow, + name=name, + version=version, + flow_run_name=flow_run_name, + task_runner=task_runner, + description=description, + timeout_seconds=timeout_seconds, + validate_parameters=validate_parameters, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + persist_result=persist_result, + result_storage=result_storage, + result_serializer=result_serializer, + cache_result_in_memory=cache_result_in_memory, + log_prints=log_prints, + on_completion=on_completion, + on_failure=on_failure, + on_cancellation=on_cancellation, + on_crashed=on_crashed, + on_running=on_running, + ), + ) + + if not TYPE_CHECKING: + # Add from_source so it is available on the flow function we all know and love + from_source = staticmethod(Flow.from_source) + else: + # Mypy loses the plot somewhere along the line, so the annotation is reconstructed + # manually here. + @staticmethod + def from_source( + source: Union[str, "RunnerStorage", ReadableDeploymentStorage], + entrypoint: str, + ) -> Union["Flow[..., Any]", Coroutine[Any, Any, "Flow[..., Any]"]]: + ... + + +flow = FlowDecorator() def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]: @@ -1636,10 +1683,6 @@ def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]: return name -# Add from_source so it is available on the flow function we all know and love -flow.from_source = Flow.from_source - - def select_flow( flows: Iterable[Flow[P, R]], flow_name: Optional[str] = None, diff --git a/tests/test_flows.py b/tests/test_flows.py index 40793fe38662b..733529e546771 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -496,7 +496,7 @@ def test_with_options_signature_aligns_with_flow_signature(self): inspect.signature(Flow.with_options).parameters.keys() ) # `with_options` does not accept a new function - flow_params.remove("__fn") + flow_params.remove("_FlowDecorator__fn") # `self` isn't in flow decorator with_options_params.remove("self") diff --git a/tests/typesafety/test_flows.yml b/tests/typesafety/test_flows.yml new file mode 100644 index 0000000000000..2db73d00e6c5a --- /dev/null +++ b/tests/typesafety/test_flows.yml @@ -0,0 +1,37 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/typeddjango/pytest-mypy-plugins/master/pytest_mypy_plugins/schema.json +- case: prefect_flow_from_source + main: | + from prefect import flow + reveal_type(flow.from_source) + regex: yes + # this has to be a regex, because mypy randomly (!) switches between ... and [*Any, **Any] syntax here + out: "main:2: note: Revealed type is \"\ + def \\(\ + source. Union\\[builtins\\.str, prefect\\.runner\\.storage\\.RunnerStorage, prefect\\.filesystems\\.ReadableDeploymentStorage\\], + entrypoint. builtins\\.str\ + \\) -> Union\\[\ + prefect\\.flows\\.Flow\\[(\\[\\*Any, \\*\\*Any\\]|\\.\\.\\.), Any\\], + typing\\.Coroutine\\[Any, Any, prefect\\.flows\\.Flow\\[(\\[\\*Any, \\*\\*Any\\]|\\.\\.\\.), Any\\]\\]\\]\ + \"" + +- case: prefect_flow_decorator_no_args + main: | + from prefect import flow + @flow + def foo(bar: str) -> int: + return 42 + reveal_type(foo) + out: "main:5: note: Revealed type is \"\ + prefect.flows.Flow[[bar: builtins.str], builtins.int]\ + \"" + +- case: prefect_flow_decorator_with_name_arg + main: | + from prefect import flow + @flow(name="bar") + def foo(bar: str) -> int: + return 42 + reveal_type(foo) + out: "main:5: note: Revealed type is \"\ + prefect.flows.Flow[[bar: builtins.str], builtins.int]\ + \"" \ No newline at end of file