Skip to content

Commit

Permalink
Merge branch 'main' into fix-terminal-state-timings-for-timedout-tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
GalLadislav authored Dec 11, 2024
2 parents 1c716c3 + 33339e1 commit ff09759
Show file tree
Hide file tree
Showing 78 changed files with 2,882 additions and 1,437 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/static-analysis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ jobs:
BASE_SCORE=$(jq -r '.typeCompleteness.completenessScore' prefect-analysis-base.json)
echo "base_score=$BASE_SCORE" >> $GITHUB_OUTPUT
- name: Checkout current branch
run: |
git checkout ${{ github.head_ref || github.ref_name }}
- name: Compare scores
run: |
CURRENT_SCORE=$(echo ${{ steps.calculate_current_score.outputs.current_score }})
Expand All @@ -110,6 +114,7 @@ jobs:
if (( $(echo "$BASE_SCORE > $CURRENT_SCORE" | bc -l) )); then
echo "❌ Type completeness score has decreased from $BASE_SCORE to $CURRENT_SCORE" >> $GITHUB_STEP_SUMMARY
echo "Please add type annotations to your code to increase the type completeness score." >> $GITHUB_STEP_SUMMARY
uv run scripts/pyright_diff.py prefect-analysis-base.json prefect-analysis.json >> $GITHUB_STEP_SUMMARY
exit 1
elif (( $(echo "$BASE_SCORE < $CURRENT_SCORE" | bc -l) )); then
echo "✅ Type completeness score has increased from $BASE_SCORE to $CURRENT_SCORE" >> $GITHUB_STEP_SUMMARY
Expand Down
2 changes: 1 addition & 1 deletion compat-tests
5 changes: 5 additions & 0 deletions docs/v3/deploy/infrastructure-examples/serverless.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ With your deployment created, navigate to its detail page and create a new flow
You'll see the flow start running without polling the work pool, because Prefect Cloud securely connected
to your serverless infrastructure, created a job, ran the job, and reported on its execution.

## Usage Limits

Push work pool usage is unlimited. However push work pools only support flow runs with
a max run time of 24 hours.

## Next steps

Learn more about [work pools](/v3/deploy/infrastructure-concepts/work-pools/) and [workers](/v3/deploy/infrastructure-concepts/workers/).
Expand Down
88 changes: 88 additions & 0 deletions scripts/pyright_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import json
import sys
from typing import Any, Dict, NamedTuple


class Diagnostic(NamedTuple):
"""Structured representation of a diagnostic for easier table formatting."""

file: str
line: int
character: int
severity: str
message: str


def normalize_diagnostic(diagnostic: Dict[Any, Any]) -> Dict[Any, Any]:
"""Normalize a diagnostic by removing or standardizing volatile fields."""
normalized = diagnostic.copy()
normalized.pop("time", None)
normalized.pop("version", None)
return normalized


def load_and_normalize_file(file_path: str) -> Dict[Any, Any]:
"""Load a JSON file and normalize its contents."""
with open(file_path, "r") as f:
data = json.load(f)
return normalize_diagnostic(data)


def parse_diagnostic(diag: Dict[Any, Any]) -> Diagnostic:
"""Convert a diagnostic dict into a Diagnostic object."""
file = diag.get("file", "unknown_file")
message = diag.get("message", "no message")
range_info = diag.get("range", {})
start = range_info.get("start", {})
line = start.get("line", 0)
char = start.get("character", 0)
severity = diag.get("severity", "unknown")

return Diagnostic(file, line, char, severity, message)


def format_markdown_table(diagnostics: list[Diagnostic]) -> str:
"""Format list of diagnostics as a markdown table."""
if not diagnostics:
return "\nNo new errors found!"

table = ["| File | Location | Message |", "|------|----------|---------|"]

for diag in sorted(diagnostics, key=lambda x: (x.file, x.line, x.character)):
# Escape pipe characters and replace newlines with HTML breaks
message = diag.message.replace("|", "\\|").replace("\n", "<br>")
location = f"L{diag.line}:{diag.character}"
table.append(f"| {diag.file} | {location} | {message} |")

return "\n".join(table)


def compare_pyright_outputs(base_file: str, new_file: str) -> None:
"""Compare two pyright JSON output files and display only new errors."""
base_data = load_and_normalize_file(base_file)
new_data = load_and_normalize_file(new_file)

# Group diagnostics by file
base_diags = set()
new_diags = set()

# Process diagnostics from type completeness symbols
for data, diag_set in [(base_data, base_diags), (new_data, new_diags)]:
for symbol in data.get("typeCompleteness", {}).get("symbols", []):
for diag in symbol.get("diagnostics", []):
if diag.get("severity", "") == "error":
diag_set.add(parse_diagnostic(diag))

# Find new errors
new_errors = list(new_diags - base_diags)

print("\n## New Pyright Errors\n")
print(format_markdown_table(new_errors))


if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python pyright_diff.py <base.json> <new.json>")
sys.exit(1)

compare_pyright_outputs(sys.argv[1], sys.argv[2])
93 changes: 46 additions & 47 deletions src/prefect/_internal/concurrency/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,46 @@
import asyncio
import concurrent.futures
import contextlib
from typing import (
Any,
Awaitable,
Callable,
ContextManager,
Iterable,
Optional,
TypeVar,
Union,
)
from collections.abc import Awaitable, Iterable
from contextlib import AbstractContextManager
from typing import Any, Callable, Optional, Union, cast

from typing_extensions import ParamSpec
from typing_extensions import ParamSpec, TypeAlias, TypeVar

from prefect._internal.concurrency.threads import (
WorkerThread,
get_global_loop,
in_global_loop,
)
from prefect._internal.concurrency.waiters import (
AsyncWaiter,
Call,
SyncWaiter,
)
from prefect._internal.concurrency.waiters import AsyncWaiter, Call, SyncWaiter

P = ParamSpec("P")
T = TypeVar("T")
T = TypeVar("T", infer_variance=True)
Future = Union[concurrent.futures.Future[T], asyncio.Future[T]]

_SyncOrAsyncCallable: TypeAlias = Callable[P, Union[T, Awaitable[T]]]

def create_call(__fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Call[T]:

def create_call(
__fn: _SyncOrAsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
) -> Call[T]:
return Call[T].new(__fn, *args, **kwargs)


def _cast_to_call(call_like: Union[Callable[[], T], Call[T]]) -> Call[T]:
def cast_to_call(
call_like: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
) -> Call[T]:
if isinstance(call_like, Call):
return call_like
return cast(Call[T], call_like)
else:
return create_call(call_like)


class _base(abc.ABC):
@abc.abstractstaticmethod
@staticmethod
@abc.abstractmethod
def wait_for_call_in_loop_thread(
__call: Union[Callable[[], T], Call[T]], # type: ignore[reportGeneralTypeIssues]
__call: Union["_SyncOrAsyncCallable[[], Any]", Call[T]],
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call[Any]]] = None,
) -> T:
Expand All @@ -60,9 +56,10 @@ def wait_for_call_in_loop_thread(
"""
raise NotImplementedError()

@abc.abstractstaticmethod
@staticmethod
@abc.abstractmethod
def wait_for_call_in_new_thread(
__call: Union[Callable[[], T], Call[T]], # type: ignore[reportGeneralTypeIssues]
__call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call[Any]]] = None,
) -> T:
Expand All @@ -75,30 +72,31 @@ def wait_for_call_in_new_thread(

@staticmethod
def call_soon_in_new_thread(
__call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None
__call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
timeout: Optional[float] = None,
) -> Call[T]:
"""
Schedule a call for execution in a new worker thread.
Returns the submitted call.
"""
call = _cast_to_call(__call)
call = cast_to_call(__call)
runner = WorkerThread(run_once=True)
call.set_timeout(timeout)
runner.submit(call)
return call

@staticmethod
def call_soon_in_loop_thread(
__call: Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
__call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
timeout: Optional[float] = None,
) -> Call[T]:
"""
Schedule a call for execution in the global event loop thread.
Returns the submitted call.
"""
call = _cast_to_call(__call)
call = cast_to_call(__call)
runner = get_global_loop()
call.set_timeout(timeout)
runner.submit(call)
Expand All @@ -117,7 +115,7 @@ def call_in_new_thread(

@staticmethod
def call_in_loop_thread(
__call: Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
__call: Union[Callable[[], Awaitable[T]], Call[T]],
timeout: Optional[float] = None,
) -> T:
"""
Expand All @@ -131,12 +129,12 @@ def call_in_loop_thread(
class from_async(_base):
@staticmethod
async def wait_for_call_in_loop_thread(
__call: Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
__call: Union[Callable[[], Awaitable[T]], Call[T]],
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call[Any]]] = None,
contexts: Optional[Iterable[ContextManager[Any]]] = None,
) -> Awaitable[T]:
call = _cast_to_call(__call)
contexts: Optional[Iterable[AbstractContextManager[Any]]] = None,
) -> T:
call = cast_to_call(__call)
waiter = AsyncWaiter(call)
for callback in done_callbacks or []:
waiter.add_done_callback(callback)
Expand All @@ -153,7 +151,7 @@ async def wait_for_call_in_new_thread(
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call[Any]]] = None,
) -> T:
call = _cast_to_call(__call)
call = cast_to_call(__call)
waiter = AsyncWaiter(call=call)
for callback in done_callbacks or []:
waiter.add_done_callback(callback)
Expand All @@ -170,7 +168,7 @@ def call_in_new_thread(

@staticmethod
def call_in_loop_thread(
__call: Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
__call: Union[Callable[[], Awaitable[T]], Call[T]],
timeout: Optional[float] = None,
) -> Awaitable[T]:
call = _base.call_soon_in_loop_thread(__call, timeout=timeout)
Expand All @@ -182,13 +180,13 @@ class from_sync(_base):
def wait_for_call_in_loop_thread(
__call: Union[
Callable[[], Awaitable[T]],
Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
Call[T],
],
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call]] = None,
contexts: Optional[Iterable[ContextManager]] = None,
) -> Awaitable[T]:
call = _cast_to_call(__call)
done_callbacks: Optional[Iterable[Call[T]]] = None,
contexts: Optional[Iterable[AbstractContextManager[Any]]] = None,
) -> T:
call = cast_to_call(__call)
waiter = SyncWaiter(call)
_base.call_soon_in_loop_thread(call, timeout=timeout)
for callback in done_callbacks or []:
Expand All @@ -203,9 +201,9 @@ def wait_for_call_in_loop_thread(
def wait_for_call_in_new_thread(
__call: Union[Callable[[], T], Call[T]],
timeout: Optional[float] = None,
done_callbacks: Optional[Iterable[Call]] = None,
) -> Call[T]:
call = _cast_to_call(__call)
done_callbacks: Optional[Iterable[Call[T]]] = None,
) -> T:
call = cast_to_call(__call)
waiter = SyncWaiter(call=call)
for callback in done_callbacks or []:
waiter.add_done_callback(callback)
Expand All @@ -215,20 +213,21 @@ def wait_for_call_in_new_thread(

@staticmethod
def call_in_new_thread(
__call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None
__call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
timeout: Optional[float] = None,
) -> T:
call = _base.call_soon_in_new_thread(__call, timeout=timeout)
return call.result()

@staticmethod
def call_in_loop_thread(
__call: Union[Callable[[], Awaitable[T]], Call[Awaitable[T]]],
__call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
timeout: Optional[float] = None,
) -> T:
) -> Union[Awaitable[T], T]:
if in_global_loop():
# Avoid deadlock where the call is submitted to the loop then the loop is
# blocked waiting for the call
call = _cast_to_call(__call)
call = cast_to_call(__call)
return call()

call = _base.call_soon_in_loop_thread(__call, timeout=timeout)
Expand Down
Loading

0 comments on commit ff09759

Please sign in to comment.