diff --git a/flows/client_context_lifespan.py b/flows/client_context_lifespan.py index d4d6607755d2..543f59f93491 100644 --- a/flows/client_context_lifespan.py +++ b/flows/client_context_lifespan.py @@ -1,26 +1,20 @@ -from packaging.version import Version - -import prefect - -# Only run these tests if the version is at least 2.13.0 -if Version(prefect.__version__) < Version("2.13.0"): - raise NotImplementedError() - import asyncio import random import threading from contextlib import asynccontextmanager +from typing import Callable from unittest.mock import MagicMock import anyio -from prefect._vendor.fastapi import FastAPI +from fastapi import FastAPI +import prefect import prefect.context import prefect.exceptions from prefect.client.orchestration import PrefectClient -def make_lifespan(startup, shutdown) -> callable: +def make_lifespan(startup, shutdown) -> Callable: async def lifespan(app): try: startup() @@ -32,6 +26,7 @@ async def lifespan(app): def client_context_lifespan_is_robust_to_threaded_concurrency(): + print("testing that client context lifespan is robust to threaded concurrency") startup, shutdown = MagicMock(), MagicMock() app = FastAPI(lifespan=make_lifespan(startup, shutdown)) @@ -61,6 +56,7 @@ async def enter_client(context): async def client_context_lifespan_is_robust_to_high_async_concurrency(): + print("testing that client context lifespan is robust to high async concurrency") startup, shutdown = MagicMock(), MagicMock() app = FastAPI(lifespan=make_lifespan(startup, shutdown)) @@ -70,7 +66,7 @@ async def enter_client(): async with PrefectClient(app): await anyio.sleep(random.random()) - with anyio.fail_after(15): + with anyio.fail_after(30): async with anyio.create_task_group() as tg: for _ in range(1000): tg.start_soon(enter_client) @@ -80,6 +76,7 @@ async def enter_client(): async def client_context_lifespan_is_robust_to_mixed_concurrency(): + print("testing that client context lifespan is robust to mixed concurrency") startup, shutdown = MagicMock(), MagicMock() app = FastAPI(lifespan=make_lifespan(startup, shutdown)) @@ -91,10 +88,14 @@ async def enter_client(): async def enter_client_many_times(context): # We must re-enter the profile context in the new thread - with context: - async with anyio.create_task_group() as tg: - for _ in range(100): - tg.start_soon(enter_client) + try: + with context: + async with anyio.create_task_group() as tg: + for _ in range(10): + tg.start_soon(enter_client) + except Exception as e: + print(f"Error entering client many times {e}") + raise e threads = [ threading.Thread( @@ -104,7 +105,7 @@ async def enter_client_many_times(context): prefect.context.SettingsContext.get().copy(), ), ) - for _ in range(100) + for _ in range(10) ] for thread in threads: thread.start() diff --git a/scripts/run-integration-flows.py b/scripts/run-integration-flows.py index 5af0ed1a22de..660291d07eff 100755 --- a/scripts/run-integration-flows.py +++ b/scripts/run-integration-flows.py @@ -10,14 +10,13 @@ Example: - PREFECT_API_URL="http://localhost:4200" ./scripts/run-integration-flows.py + PREFECT_API_URL="http://localhost:4200/api" ./scripts/run-integration-flows.py """ -import os -import runpy +import subprocess import sys from pathlib import Path -from typing import Union +from typing import List, Union import prefect from prefect import __version__ @@ -31,25 +30,30 @@ ) +def run_script(script_path: str): + print(f" {script_path} ".center(90, "-"), flush=True) + try: + result = subprocess.run( + ["python", script_path], capture_output=True, text=True, check=True + ) + return result.stdout, result.stderr, None + except subprocess.CalledProcessError as e: + return e.stdout, e.stderr, e + + def run_flows(search_path: Union[str, Path]): - count = 0 print(f"Running integration tests with client version: {__version__}") - server_version = os.environ.get("TEST_SERVER_VERSION") - if server_version: - print(f"and server version: {server_version}") - - for file in sorted(Path(search_path).glob("**/*.py")): - print(f" {file.relative_to(search_path)} ".center(90, "-"), flush=True) + scripts = sorted(Path(search_path).glob("**/*.py")) + errors: List[Exception] = [] + for script in scripts: + print(f"Running {script}") try: - runpy.run_path(file, run_name="__main__") - except NotImplementedError: - print(f"Skipping {file}: not supported by this version of Prefect") - print("".center(90, "-") + "\n", flush=True) - count += 1 - - if not count: - print(f"No Python files found at {search_path}") - exit(1) + run_script(str(script)) + except Exception as e: + print(f"Error running {script}: {e}") + errors.append(e) + + assert not errors, "Errors occurred while running flows" if __name__ == "__main__":