Skip to content

Commit

Permalink
more type completeness
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Dec 10, 2024
1 parent 28fdf87 commit f640e2e
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,17 @@ def fast_flow():
from copy import deepcopy
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Set, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Union,
)
from uuid import UUID, uuid4

import anyio
Expand Down Expand Up @@ -432,10 +442,14 @@ def goodbye_flow(name):
)
)

def execute_in_background(self, func, *args, **kwargs):
def execute_in_background(
self, func: Callable[..., Any], *args: Any, **kwargs: Any
):
"""
Executes a function in the background.
"""
if TYPE_CHECKING:
assert self._loop is not None

return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop)

Expand Down Expand Up @@ -536,7 +550,7 @@ def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter:
async def _run_process(
self,
flow_run: "FlowRun",
task_status: Optional[anyio.abc.TaskStatus] = None,
task_status: Optional[anyio.abc.TaskStatus[Any]] = None,
entrypoint: Optional[str] = None,
):
"""
Expand Down Expand Up @@ -723,7 +737,9 @@ async def _get_and_submit_flow_runs(self):
return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)

async def _check_for_cancelled_flow_runs(
self, should_stop: Callable = lambda: False, on_stop: Callable = lambda: None
self,
should_stop: Callable[[], bool] = lambda: False,
on_stop: Callable[[], None] = lambda: None,
):
"""
Checks for flow runs with CANCELLING a cancelling state and attempts to
Expand Down Expand Up @@ -862,31 +878,37 @@ def _emit_flow_run_cancelled_event(
flow: "Optional[APIFlow]",
deployment: "Optional[Deployment]",
):
related = []
tags = []
related: list[RelatedResource] = []
tags: list[str] = []
if deployment:
related.append(
{
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.role": "deployment",
"prefect.resource.name": deployment.name,
}
RelatedResource(
{
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.role": "deployment",
"prefect.resource.name": deployment.name,
}
)
)
tags.extend(deployment.tags)
if flow:
related.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.flow.{flow.id}",
"prefect.resource.role": "flow",
"prefect.resource.name": flow.name,
}
)
)
related.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.flow.{flow.id}",
"prefect.resource.role": "flow",
"prefect.resource.name": flow.name,
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.role": "flow-run",
"prefect.resource.name": flow_run.name,
}
)
related.append(
{
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.role": "flow-run",
"prefect.resource.name": flow_run.name,
}
)
tags.extend(flow_run.tags)

Expand Down

0 comments on commit f640e2e

Please sign in to comment.