Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically run a server in a subprocess if no API URL is specified #14722

Merged
merged 40 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
9d1a9e5
Run a server in a subprocess if no API URL is specified
desertaxle Jul 23, 2024
7fde075
Fixes some server side tests that were failing due to ephemeral API c…
desertaxle Jul 24, 2024
9a11704
Add `atexit` hook to ensure subprocess servers don't hang around
desertaxle Jul 25, 2024
b23a871
Auto use hosted API fixture for filters tests
desertaxle Jul 25, 2024
02d8e97
Fixes fixture scope
desertaxle Jul 25, 2024
8775b12
Update block creation in a couple of tests
desertaxle Jul 25, 2024
c13279f
Get server tests passing
desertaxle Jul 25, 2024
bfe3a84
Tweak how benches are started
desertaxle Jul 25, 2024
9625884
Merge branch 'main' into subprocess-server
desertaxle Jul 25, 2024
379c5af
Merge branch 'main' into subprocess-server
desertaxle Jul 26, 2024
3606b34
Fixes some tests
desertaxle Jul 26, 2024
26de0cb
Fix some more tests
desertaxle Jul 26, 2024
1372bcf
More test fixes
desertaxle Jul 29, 2024
be819da
Fix more tests
desertaxle Jul 29, 2024
415f247
Merge branch 'main' into subprocess-server
desertaxle Jul 29, 2024
e2cbe94
Fixes some more tests
desertaxle Jul 29, 2024
ea3acb3
Fix missing doc generation
desertaxle Jul 29, 2024
2682d58
Fix more tests
desertaxle Jul 29, 2024
2d935c6
Attempt to avoid race conditions between different processes choosing…
desertaxle Jul 29, 2024
896d982
Revert changes
desertaxle Jul 29, 2024
0415d96
Remove file lock and pass through client in `Block.load`
desertaxle Jul 29, 2024
a375fd5
Tinker with subprocess server settings
desertaxle Jul 30, 2024
85bef84
Merge branch 'main' into subprocess-server
desertaxle Jul 30, 2024
5ec99ae
Fix concurrency test and stop ephemeral server after each test that u…
desertaxle Jul 30, 2024
7eff770
Use correct health check endpoint
desertaxle Jul 30, 2024
af5f86b
Fix env var
desertaxle Jul 30, 2024
f2e9dd0
Use `benchmark.pedantic` to only run long benchmarks once
desertaxle Jul 30, 2024
06acf25
Fixes flow bench
desertaxle Jul 30, 2024
a3e9f25
Add tests for new functionality
desertaxle Jul 30, 2024
5bd5aa4
Add setting to adjust subprocess server start up wait time
desertaxle Jul 30, 2024
24fb8e2
Merge branch 'main' into subprocess-server
desertaxle Jul 30, 2024
aee35f0
remove option to pass in api
desertaxle Jul 30, 2024
0163169
Remove unnecessary fixture
desertaxle Jul 30, 2024
ea03fc4
Update src/prefect/server/api/server.py
desertaxle Jul 31, 2024
5c76568
Update tests/server/api/test_server.py
desertaxle Jul 31, 2024
964e5b2
Merge branch 'main' into subprocess-server
desertaxle Jul 31, 2024
77862d8
Add test for start idempotency
desertaxle Jul 31, 2024
28f913e
Update default profiles and tweak setting name
desertaxle Jul 31, 2024
b97429b
Remove database setting from ephemeral profile
desertaxle Jul 31, 2024
cec7998
Fixes failing tests
desertaxle Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ jobs:
# characters with an underscore
sanitized_uniquename="${uniquename//[^a-zA-Z0-9_\-]/_}"

PREFECT_API_URL="http://127.0.0.1:4200/api"
python benches \
PREFECT_API_URL="http://127.0.0.1:4200/api" \
python -m benches \
--ignore=benches/bench_import.py \
--timeout=180 \
--benchmark-save="${sanitized_uniquename}" \
Expand Down
10 changes: 8 additions & 2 deletions benches/bench_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def benchmark_flow():
for _ in range(num_tasks):
test_task()

benchmark(benchmark_flow)
if num_tasks > 100:
benchmark.pedantic(benchmark_flow)
else:
benchmark(benchmark_flow)
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("num_tasks", [10, 50, 100, 250])
Expand All @@ -68,7 +71,10 @@ async def benchmark_flow():
for _ in range(num_tasks):
tg.start_soon(test_task)

benchmark(anyio.run, benchmark_flow)
if num_tasks > 100:
benchmark.pedantic(anyio.run, (benchmark_flow,))
else:
benchmark(anyio.run, benchmark_flow)


@pytest.mark.parametrize("num_flows", [5, 10, 20])
Expand Down
10 changes: 10 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22271,6 +22271,16 @@
"title": "Prefect Server Csrf Token Expiration",
"default": "PT1H"
},
"PREFECT_SERVER_ALLOW_EPHEMERAL_MODE": {
"type": "boolean",
"title": "Prefect Server Allow Ephemeral Mode",
"default": false
},
"PREFECT_SERVER_EPHEMERAL_START_UP_WAIT_SECONDS": {
"type": "integer",
"title": "Prefect Server Ephemeral Start Up Wait Seconds",
"default": 10
},
"PREFECT_UI_ENABLED": {
"type": "boolean",
"title": "Prefect Ui Enabled",
Expand Down
4 changes: 3 additions & 1 deletion src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ class Custom(Block):
loaded_block.save("my-custom-message", overwrite=True)
```
"""
block_document, block_document_name = await cls._get_block_document(name)
block_document, block_document_name = await cls._get_block_document(
name, client=client
)

return cls._load_from_block_document(block_document, validate=validate)

Expand Down
44 changes: 29 additions & 15 deletions src/prefect/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from prefect.cli._utilities import exit_with_error, exit_with_success
from prefect.cli.cloud import CloudUnauthorizedError, get_cloud_client
from prefect.cli.root import app, is_interactive
from prefect.client.base import determine_server_type
from prefect.client.orchestration import ServerType, get_client
from prefect.context import use_profile
from prefect.exceptions import ObjectNotFound
Expand Down Expand Up @@ -122,11 +123,11 @@ async def use(name: str):
exit_with_error,
f"Error authenticating with Prefect Cloud using profile {name!r}",
),
ConnectionStatus.ORION_CONNECTED: (
ConnectionStatus.SERVER_CONNECTED: (
exit_with_success,
f"Connected to Prefect server using profile {name!r}",
),
ConnectionStatus.ORION_ERROR: (
ConnectionStatus.SERVER_ERROR: (
exit_with_error,
f"Error connecting to Prefect server using profile {name!r}",
),
Expand All @@ -137,6 +138,13 @@ async def use(name: str):
" in ephemeral mode."
),
),
ConnectionStatus.UNCONFIGURED: (
exit_with_error,
(
f"Prefect server URL not configured using profile {name!r} - please"
" configure the server URL or enable ephemeral mode."
),
),
ConnectionStatus.INVALID_API: (
exit_with_error,
"Error connecting to Prefect API URL",
Expand Down Expand Up @@ -302,8 +310,9 @@ class ConnectionStatus(AutoEnum):
CLOUD_CONNECTED = AutoEnum.auto()
CLOUD_ERROR = AutoEnum.auto()
CLOUD_UNAUTHORIZED = AutoEnum.auto()
ORION_CONNECTED = AutoEnum.auto()
ORION_ERROR = AutoEnum.auto()
SERVER_CONNECTED = AutoEnum.auto()
SERVER_ERROR = AutoEnum.auto()
UNCONFIGURED = AutoEnum.auto()
EPHEMERAL = AutoEnum.auto()
INVALID_API = AutoEnum.auto()

Expand All @@ -327,35 +336,40 @@ async def check_orion_connection():
try:
# inform the user if Prefect API endpoints exist, but there are
# connection issues
server_type = determine_server_type()
if server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
elif server_type == ServerType.UNCONFIGURED:
return ConnectionStatus.UNCONFIGURED
client = get_client(httpx_settings=httpx_settings)
async with client:
connect_error = await client.api_healthcheck()
if connect_error is not None:
return ConnectionStatus.ORION_ERROR
elif client.server_type == ServerType.EPHEMERAL:
# if the client is using an ephemeral Prefect app, inform the user
return ConnectionStatus.EPHEMERAL
return ConnectionStatus.SERVER_ERROR
else:
return ConnectionStatus.ORION_CONNECTED
return ConnectionStatus.SERVER_CONNECTED
except Exception:
return ConnectionStatus.ORION_ERROR
return ConnectionStatus.SERVER_ERROR
except httpx.HTTPStatusError:
return ConnectionStatus.CLOUD_ERROR
except TypeError:
# if no Prefect API URL has been set, httpx will throw a TypeError
try:
# try to connect with the client anyway, it will likely use an
# ephemeral Prefect instance
server_type = determine_server_type()
if server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
elif server_type == ServerType.UNCONFIGURED:
return ConnectionStatus.UNCONFIGURED
client = get_client(httpx_settings=httpx_settings)
async with client:
connect_error = await client.api_healthcheck()
if connect_error is not None:
return ConnectionStatus.ORION_ERROR
elif client.server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
return ConnectionStatus.SERVER_ERROR
else:
return ConnectionStatus.ORION_CONNECTED
return ConnectionStatus.SERVER_CONNECTED
except Exception:
return ConnectionStatus.ORION_ERROR
return ConnectionStatus.SERVER_ERROR
except (httpx.ConnectError, httpx.UnsupportedProtocol):
return ConnectionStatus.INVALID_API
12 changes: 2 additions & 10 deletions src/prefect/cli/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import prefect.settings
from prefect.cli._types import PrefectTyper, SettingsOption
from prefect.cli._utilities import with_cli_exception_handling
from prefect.client.base import determine_server_type
from prefect.client.constants import SERVER_API_VERSION
from prefect.client.orchestration import ServerType
from prefect.logging.configuration import setup_logging
Expand Down Expand Up @@ -117,16 +118,7 @@ async def version(
"OS/Arch": f"{sys.platform}/{platform.machine()}",
"Profile": prefect.context.get_settings_context().profile.name,
}

server_type: str

try:
# We do not context manage the client because when using an ephemeral app we do not
# want to create the database or run migrations
client = prefect.get_client()
server_type = client.server_type.value
except Exception:
server_type = "<client error>"
server_type = determine_server_type()

version_info["Server type"] = server_type.lower()

Expand Down
34 changes: 34 additions & 0 deletions src/prefect/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
from prefect.exceptions import PrefectHTTPStatusError
from prefect.logging import get_logger
from prefect.settings import (
PREFECT_API_URL,
PREFECT_CLIENT_MAX_RETRIES,
PREFECT_CLIENT_RETRY_EXTRA_CODES,
PREFECT_CLIENT_RETRY_JITTER_FACTOR,
PREFECT_CLOUD_API_URL,
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
)
from prefect.utilities.collections import AutoEnum
from prefect.utilities.math import bounded_poisson_interval, clamped_poisson_interval

# Datastores for lifespan management, keys should be a tuple of thread and app
Expand Down Expand Up @@ -637,3 +641,33 @@ def __init__(
)

pass


class ServerType(AutoEnum):
EPHEMERAL = AutoEnum.auto()
SERVER = AutoEnum.auto()
CLOUD = AutoEnum.auto()
UNCONFIGURED = AutoEnum.auto()


def determine_server_type() -> ServerType:
"""
Determine the server type based on the current settings.

Returns:
- `ServerType.EPHEMERAL` if the ephemeral server is enabled
- `ServerType.SERVER` if a API URL is configured and it is not a cloud URL
- `ServerType.CLOUD` if an API URL is configured and it is a cloud URL
- `ServerType.UNCONFIGURED` if no API URL is configured and ephemeral mode is
not enabled
"""
api_url = PREFECT_API_URL.value()
if api_url is None:
if PREFECT_SERVER_ALLOW_EPHEMERAL_MODE.value():
return ServerType.EPHEMERAL
else:
return ServerType.UNCONFIGURED
if api_url.startswith(PREFECT_CLOUD_API_URL.value()):
return ServerType.CLOUD
else:
return ServerType.SERVER
Loading
Loading