From f640e2e8af2beda306fd4f74e3ec3166a84df59e Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 10 Dec 2024 10:00:02 -0600 Subject: [PATCH] more type completeness --- src/prefect/runner/runner.py | 62 ++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/src/prefect/runner/runner.py b/src/prefect/runner/runner.py index 71622028bbd6..ff64be942e1c 100644 --- a/src/prefect/runner/runner.py +++ b/src/prefect/runner/runner.py @@ -43,7 +43,17 @@ def fast_flow(): from copy import deepcopy from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Set, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Union, +) from uuid import UUID, uuid4 import anyio @@ -432,10 +442,14 @@ def goodbye_flow(name): ) ) - def execute_in_background(self, func, *args, **kwargs): + def execute_in_background( + self, func: Callable[..., Any], *args: Any, **kwargs: Any + ): """ Executes a function in the background. """ + if TYPE_CHECKING: + assert self._loop is not None return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop) @@ -536,7 +550,7 @@ def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter: async def _run_process( self, flow_run: "FlowRun", - task_status: Optional[anyio.abc.TaskStatus] = None, + task_status: Optional[anyio.abc.TaskStatus[Any]] = None, entrypoint: Optional[str] = None, ): """ @@ -723,7 +737,9 @@ async def _get_and_submit_flow_runs(self): return await self._submit_scheduled_flow_runs(flow_run_response=runs_response) async def _check_for_cancelled_flow_runs( - self, should_stop: Callable = lambda: False, on_stop: Callable = lambda: None + self, + should_stop: Callable[[], bool] = lambda: False, + on_stop: Callable[[], None] = lambda: None, ): """ Checks for flow runs with CANCELLING a cancelling state and attempts to @@ -862,31 +878,37 @@ def _emit_flow_run_cancelled_event( flow: "Optional[APIFlow]", deployment: "Optional[Deployment]", ): - related = [] - tags = [] + related: list[RelatedResource] = [] + tags: list[str] = [] if deployment: related.append( - { - "prefect.resource.id": f"prefect.deployment.{deployment.id}", - "prefect.resource.role": "deployment", - "prefect.resource.name": deployment.name, - } + RelatedResource( + { + "prefect.resource.id": f"prefect.deployment.{deployment.id}", + "prefect.resource.role": "deployment", + "prefect.resource.name": deployment.name, + } + ) ) tags.extend(deployment.tags) if flow: related.append( + RelatedResource( + { + "prefect.resource.id": f"prefect.flow.{flow.id}", + "prefect.resource.role": "flow", + "prefect.resource.name": flow.name, + } + ) + ) + related.append( + RelatedResource( { - "prefect.resource.id": f"prefect.flow.{flow.id}", - "prefect.resource.role": "flow", - "prefect.resource.name": flow.name, + "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", + "prefect.resource.role": "flow-run", + "prefect.resource.name": flow_run.name, } ) - related.append( - { - "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", - "prefect.resource.role": "flow-run", - "prefect.resource.name": flow_run.name, - } ) tags.extend(flow_run.tags)