Skip to content

Commit

Permalink
apply same updates to integration tests
Browse files Browse the repository at this point in the history
concede

fix path getting
  • Loading branch information
zzstoatzz committed Dec 11, 2024
1 parent eb6f89b commit 76dab13
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 36 deletions.
33 changes: 17 additions & 16 deletions flows/client_context_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
from packaging.version import Version

import prefect

# Only run these tests if the version is at least 2.13.0
if Version(prefect.__version__) < Version("2.13.0"):
raise NotImplementedError()

import asyncio
import random
import threading
from contextlib import asynccontextmanager
from typing import Callable
from unittest.mock import MagicMock

import anyio
from prefect._vendor.fastapi import FastAPI
from fastapi import FastAPI

import prefect
import prefect.context
import prefect.exceptions
from prefect.client.orchestration import PrefectClient


def make_lifespan(startup, shutdown) -> callable:
def make_lifespan(startup, shutdown) -> Callable:
async def lifespan(app):
try:
startup()
Expand All @@ -32,6 +26,7 @@ async def lifespan(app):


def client_context_lifespan_is_robust_to_threaded_concurrency():
print("testing that client context lifespan is robust to threaded concurrency")
startup, shutdown = MagicMock(), MagicMock()
app = FastAPI(lifespan=make_lifespan(startup, shutdown))

Expand Down Expand Up @@ -61,6 +56,7 @@ async def enter_client(context):


async def client_context_lifespan_is_robust_to_high_async_concurrency():
print("testing that client context lifespan is robust to high async concurrency")
startup, shutdown = MagicMock(), MagicMock()
app = FastAPI(lifespan=make_lifespan(startup, shutdown))

Expand All @@ -70,7 +66,7 @@ async def enter_client():
async with PrefectClient(app):
await anyio.sleep(random.random())

with anyio.fail_after(15):
with anyio.fail_after(30):
async with anyio.create_task_group() as tg:
for _ in range(1000):
tg.start_soon(enter_client)
Expand All @@ -80,6 +76,7 @@ async def enter_client():


async def client_context_lifespan_is_robust_to_mixed_concurrency():
print("testing that client context lifespan is robust to mixed concurrency")
startup, shutdown = MagicMock(), MagicMock()
app = FastAPI(lifespan=make_lifespan(startup, shutdown))

Expand All @@ -91,10 +88,14 @@ async def enter_client():

async def enter_client_many_times(context):
# We must re-enter the profile context in the new thread
with context:
async with anyio.create_task_group() as tg:
for _ in range(100):
tg.start_soon(enter_client)
try:
with context:
async with anyio.create_task_group() as tg:
for _ in range(10):
tg.start_soon(enter_client)
except Exception as e:
print(f"Error entering client many times {e}")
raise e

threads = [
threading.Thread(
Expand All @@ -104,7 +105,7 @@ async def enter_client_many_times(context):
prefect.context.SettingsContext.get().copy(),
),
)
for _ in range(100)
for _ in range(10)
]
for thread in threads:
thread.start()
Expand Down
44 changes: 24 additions & 20 deletions scripts/run-integration-flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
Example:
PREFECT_API_URL="http://localhost:4200" ./scripts/run-integration-flows.py
PREFECT_API_URL="http://localhost:4200/api" ./scripts/run-integration-flows.py
"""

import os
import runpy
import subprocess
import sys
from pathlib import Path
from typing import Union
from typing import List, Union

import prefect
from prefect import __version__
Expand All @@ -31,25 +30,30 @@
)


def run_script(script_path: str):
print(f" {script_path} ".center(90, "-"), flush=True)
try:
result = subprocess.run(
["python", script_path], capture_output=True, text=True, check=True
)
return result.stdout, result.stderr, None
except subprocess.CalledProcessError as e:
return e.stdout, e.stderr, e


def run_flows(search_path: Union[str, Path]):
count = 0
print(f"Running integration tests with client version: {__version__}")
server_version = os.environ.get("TEST_SERVER_VERSION")
if server_version:
print(f"and server version: {server_version}")

for file in sorted(Path(search_path).glob("**/*.py")):
print(f" {file.relative_to(search_path)} ".center(90, "-"), flush=True)
scripts = sorted(Path(search_path).glob("**/*.py"))
errors: List[Exception] = []
for script in scripts:
print(f"Running {script}")
try:
runpy.run_path(file, run_name="__main__")
except NotImplementedError:
print(f"Skipping {file}: not supported by this version of Prefect")
print("".center(90, "-") + "\n", flush=True)
count += 1

if not count:
print(f"No Python files found at {search_path}")
exit(1)
run_script(str(script))
except Exception as e:
print(f"Error running {script}: {e}")
errors.append(e)

assert not errors, "Errors occurred while running flows"


if __name__ == "__main__":
Expand Down

0 comments on commit 76dab13

Please sign in to comment.