Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically run a server in a subprocess if no API URL is specified #14722

Merged
merged 40 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
9d1a9e5
Run a server in a subprocess if no API URL is specified
desertaxle Jul 23, 2024
7fde075
Fixes some server side tests that were failing due to ephemeral API c…
desertaxle Jul 24, 2024
9a11704
Add `atexit` hook to ensure subprocess servers don't hang around
desertaxle Jul 25, 2024
b23a871
Auto use hosted API fixture for filters tests
desertaxle Jul 25, 2024
02d8e97
Fixes fixture scope
desertaxle Jul 25, 2024
8775b12
Update block creation in a couple of tests
desertaxle Jul 25, 2024
c13279f
Get server tests passing
desertaxle Jul 25, 2024
bfe3a84
Tweak how benches are started
desertaxle Jul 25, 2024
9625884
Merge branch 'main' into subprocess-server
desertaxle Jul 25, 2024
379c5af
Merge branch 'main' into subprocess-server
desertaxle Jul 26, 2024
3606b34
Fixes some tests
desertaxle Jul 26, 2024
26de0cb
Fix some more tests
desertaxle Jul 26, 2024
1372bcf
More test fixes
desertaxle Jul 29, 2024
be819da
Fix more tests
desertaxle Jul 29, 2024
415f247
Merge branch 'main' into subprocess-server
desertaxle Jul 29, 2024
e2cbe94
Fixes some more tests
desertaxle Jul 29, 2024
ea3acb3
Fix missing doc generation
desertaxle Jul 29, 2024
2682d58
Fix more tests
desertaxle Jul 29, 2024
2d935c6
Attempt to avoid race conditions between different processes choosing…
desertaxle Jul 29, 2024
896d982
Revert changes
desertaxle Jul 29, 2024
0415d96
Remove file lock and pass through client in `Block.load`
desertaxle Jul 29, 2024
a375fd5
Tinker with subprocess server settings
desertaxle Jul 30, 2024
85bef84
Merge branch 'main' into subprocess-server
desertaxle Jul 30, 2024
5ec99ae
Fix concurrency test and stop ephemeral server after each test that u…
desertaxle Jul 30, 2024
7eff770
Use correct health check endpoint
desertaxle Jul 30, 2024
af5f86b
Fix env var
desertaxle Jul 30, 2024
f2e9dd0
Use `benchmark.pedantic` to only run long benchmarks once
desertaxle Jul 30, 2024
06acf25
Fixes flow bench
desertaxle Jul 30, 2024
a3e9f25
Add tests for new functionality
desertaxle Jul 30, 2024
5bd5aa4
Add setting to adjust subprocess server start up wait time
desertaxle Jul 30, 2024
24fb8e2
Merge branch 'main' into subprocess-server
desertaxle Jul 30, 2024
aee35f0
remove option to pass in api
desertaxle Jul 30, 2024
0163169
Remove unnecessary fixture
desertaxle Jul 30, 2024
ea03fc4
Update src/prefect/server/api/server.py
desertaxle Jul 31, 2024
5c76568
Update tests/server/api/test_server.py
desertaxle Jul 31, 2024
964e5b2
Merge branch 'main' into subprocess-server
desertaxle Jul 31, 2024
77862d8
Add test for start idempotency
desertaxle Jul 31, 2024
28f913e
Update default profiles and tweak setting name
desertaxle Jul 31, 2024
b97429b
Remove database setting from ephemeral profile
desertaxle Jul 31, 2024
cec7998
Fixes failing tests
desertaxle Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ jobs:
# characters with an underscore
sanitized_uniquename="${uniquename//[^a-zA-Z0-9_\-]/_}"

PREFECT_API_URL="http://127.0.0.1:4200/api"
python benches \
PREFECT_API_URL="http://127.0.0.1:4200/api" \
python -m benches \
--ignore=benches/bench_import.py \
--timeout=180 \
--benchmark-save="${sanitized_uniquename}" \
Expand Down
10 changes: 8 additions & 2 deletions benches/bench_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def benchmark_flow():
for _ in range(num_tasks):
test_task()

benchmark(benchmark_flow)
if num_tasks > 100:
benchmark.pedantic(benchmark_flow)
else:
benchmark(benchmark_flow)
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("num_tasks", [10, 50, 100, 250])
Expand All @@ -68,7 +71,10 @@ async def benchmark_flow():
for _ in range(num_tasks):
tg.start_soon(test_task)

benchmark(anyio.run, benchmark_flow)
if num_tasks > 100:
benchmark.pedantic(anyio.run, (benchmark_flow,))
else:
benchmark(anyio.run, benchmark_flow)


@pytest.mark.parametrize("num_flows", [5, 10, 20])
Expand Down
10 changes: 10 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22271,6 +22271,16 @@
"title": "Prefect Server Csrf Token Expiration",
"default": "PT1H"
},
"PREFECT_SERVER_ALLOW_EPHEMERAL_MODE": {
"type": "boolean",
"title": "Prefect Server Allow Ephemeral Mode",
"default": false
},
"PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS": {
"type": "integer",
"title": "Prefect Server Ephemeral Startup Timeout Seconds",
"default": 10
},
"PREFECT_UI_ENABLED": {
"type": "boolean",
"title": "Prefect Ui Enabled",
Expand Down
4 changes: 3 additions & 1 deletion src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ class Custom(Block):
loaded_block.save("my-custom-message", overwrite=True)
```
"""
block_document, block_document_name = await cls._get_block_document(name)
block_document, block_document_name = await cls._get_block_document(
name, client=client
)

return cls._load_from_block_document(block_document, validate=validate)

Expand Down
24 changes: 21 additions & 3 deletions src/prefect/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from prefect.cli._utilities import exit_with_error, exit_with_success
from prefect.cli.cloud import CloudUnauthorizedError, get_cloud_client
from prefect.cli.root import app, is_interactive
from prefect.client.base import determine_server_type
from prefect.client.orchestration import ServerType, get_client
from prefect.context import use_profile
from prefect.exceptions import ObjectNotFound
Expand Down Expand Up @@ -138,6 +139,13 @@ async def use(name: str):
" in ephemeral mode."
),
),
ConnectionStatus.UNCONFIGURED: (
exit_with_error,
(
f"Prefect server URL not configured using profile {name!r} - please"
" configure the server URL or enable ephemeral mode."
),
),
ConnectionStatus.INVALID_API: (
exit_with_error,
"Error connecting to Prefect API URL",
Expand Down Expand Up @@ -350,6 +358,7 @@ class ConnectionStatus(AutoEnum):
CLOUD_UNAUTHORIZED = AutoEnum.auto()
SERVER_CONNECTED = AutoEnum.auto()
SERVER_ERROR = AutoEnum.auto()
UNCONFIGURED = AutoEnum.auto()
EPHEMERAL = AutoEnum.auto()
INVALID_API = AutoEnum.auto()

Expand All @@ -373,14 +382,16 @@ async def check_server_connection():
try:
# inform the user if Prefect API endpoints exist, but there are
# connection issues
server_type = determine_server_type()
if server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
elif server_type == ServerType.UNCONFIGURED:
return ConnectionStatus.UNCONFIGURED
client = get_client(httpx_settings=httpx_settings)
async with client:
connect_error = await client.api_healthcheck()
if connect_error is not None:
return ConnectionStatus.SERVER_ERROR
elif client.server_type == ServerType.EPHEMERAL:
# if the client is using an ephemeral Prefect app, inform the user
return ConnectionStatus.EPHEMERAL
else:
return ConnectionStatus.SERVER_CONNECTED
except Exception:
Expand All @@ -390,6 +401,13 @@ async def check_server_connection():
except TypeError:
# if no Prefect API URL has been set, httpx will throw a TypeError
try:
# try to connect with the client anyway, it will likely use an
# ephemeral Prefect instance
server_type = determine_server_type()
if server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
elif server_type == ServerType.UNCONFIGURED:
return ConnectionStatus.UNCONFIGURED
client = get_client(httpx_settings=httpx_settings)
if client.server_type == ServerType.EPHEMERAL:
return ConnectionStatus.EPHEMERAL
Expand Down
12 changes: 2 additions & 10 deletions src/prefect/cli/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import prefect.settings
from prefect.cli._types import PrefectTyper, SettingsOption
from prefect.cli._utilities import with_cli_exception_handling
from prefect.client.base import determine_server_type
from prefect.client.constants import SERVER_API_VERSION
from prefect.client.orchestration import ServerType
from prefect.logging.configuration import setup_logging
Expand Down Expand Up @@ -117,16 +118,7 @@ async def version(
"OS/Arch": f"{sys.platform}/{platform.machine()}",
"Profile": prefect.context.get_settings_context().profile.name,
}

server_type: str

try:
# We do not context manage the client because when using an ephemeral app we do not
# want to create the database or run migrations
client = prefect.get_client()
server_type = client.server_type.value
except Exception:
server_type = "<client error>"
server_type = determine_server_type()

version_info["Server type"] = server_type.lower()

Expand Down
34 changes: 34 additions & 0 deletions src/prefect/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
from prefect.exceptions import PrefectHTTPStatusError
from prefect.logging import get_logger
from prefect.settings import (
PREFECT_API_URL,
PREFECT_CLIENT_MAX_RETRIES,
PREFECT_CLIENT_RETRY_EXTRA_CODES,
PREFECT_CLIENT_RETRY_JITTER_FACTOR,
PREFECT_CLOUD_API_URL,
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
)
from prefect.utilities.collections import AutoEnum
from prefect.utilities.math import bounded_poisson_interval, clamped_poisson_interval

# Datastores for lifespan management, keys should be a tuple of thread and app
Expand Down Expand Up @@ -637,3 +641,33 @@ def __init__(
)

pass


class ServerType(AutoEnum):
EPHEMERAL = AutoEnum.auto()
SERVER = AutoEnum.auto()
CLOUD = AutoEnum.auto()
UNCONFIGURED = AutoEnum.auto()


def determine_server_type() -> ServerType:
"""
Determine the server type based on the current settings.

Returns:
- `ServerType.EPHEMERAL` if the ephemeral server is enabled
- `ServerType.SERVER` if a API URL is configured and it is not a cloud URL
- `ServerType.CLOUD` if an API URL is configured and it is a cloud URL
- `ServerType.UNCONFIGURED` if no API URL is configured and ephemeral mode is
not enabled
"""
api_url = PREFECT_API_URL.value()
if api_url is None:
if PREFECT_SERVER_ALLOW_EPHEMERAL_MODE.value():
return ServerType.EPHEMERAL
else:
return ServerType.UNCONFIGURED
if api_url.startswith(PREFECT_CLOUD_API_URL.value()):
return ServerType.CLOUD
else:
return ServerType.SERVER
84 changes: 62 additions & 22 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@
PREFECT_API_URL,
PREFECT_CLIENT_CSRF_SUPPORT_ENABLED,
PREFECT_CLOUD_API_URL,
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
PREFECT_UNIT_TEST_MODE,
)
from prefect.utilities.collections import AutoEnum

if TYPE_CHECKING:
from prefect.flows import Flow as FlowObject
Expand All @@ -145,19 +145,14 @@
PrefectHttpxAsyncClient,
PrefectHttpxSyncClient,
PrefectHttpxSyncEphemeralClient,
ServerType,
app_lifespan_context,
)

P = ParamSpec("P")
R = TypeVar("R")


class ServerType(AutoEnum):
EPHEMERAL = AutoEnum.auto()
SERVER = AutoEnum.auto()
CLOUD = AutoEnum.auto()


@overload
def get_client(
httpx_settings: Optional[Dict[str, Any]] = None, sync_client: Literal[False] = False
Expand Down Expand Up @@ -194,8 +189,6 @@ def get_client(
"""
import prefect.context

settings_ctx = prefect.context.get_settings_context()

# try to load clients from a client context, if possible
# only load clients that match the provided config / loop
try:
Expand All @@ -217,24 +210,36 @@ def get_client(
return client_ctx.client

api = PREFECT_API_URL.value()
server_type = None

if not api:
if not api and PREFECT_SERVER_ALLOW_EPHEMERAL_MODE:
# create an ephemeral API if none was provided
from prefect.server.api.server import create_app
from prefect.server.api.server import SubprocessASGIServer

api = create_app(settings_ctx.settings, ephemeral=True)
server = SubprocessASGIServer()
server.start()
assert server.server_process is not None, "Server process did not start"

api = f"{server.address()}/api"
server_type = ServerType.EPHEMERAL
elif not api and not PREFECT_SERVER_ALLOW_EPHEMERAL_MODE:
raise ValueError(
"No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server."
)

if sync_client:
return SyncPrefectClient(
api,
api_key=PREFECT_API_KEY.value(),
httpx_settings=httpx_settings,
server_type=server_type,
)
else:
return PrefectClient(
api,
api_key=PREFECT_API_KEY.value(),
httpx_settings=httpx_settings,
server_type=server_type,
)


Expand Down Expand Up @@ -271,6 +276,7 @@ def __init__(
api_key: Optional[str] = None,
api_version: Optional[str] = None,
httpx_settings: Optional[Dict[str, Any]] = None,
server_type: Optional[ServerType] = None,
) -> None:
httpx_settings = httpx_settings.copy() if httpx_settings else {}
httpx_settings.setdefault("headers", {})
Expand Down Expand Up @@ -333,11 +339,14 @@ def __init__(
# client will use a standard HTTP/1.1 connection instead.
httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value())

self.server_type = (
ServerType.CLOUD
if api.startswith(PREFECT_CLOUD_API_URL.value())
else ServerType.SERVER
)
if server_type:
self.server_type = server_type
else:
self.server_type = (
ServerType.CLOUD
if api.startswith(PREFECT_CLOUD_API_URL.value())
else ServerType.SERVER
)

# Connect to an in-process application
elif isinstance(api, ASGIApp):
Expand Down Expand Up @@ -3386,6 +3395,7 @@ def __init__(
api_key: Optional[str] = None,
api_version: Optional[str] = None,
httpx_settings: Optional[Dict[str, Any]] = None,
server_type: Optional[ServerType] = None,
) -> None:
httpx_settings = httpx_settings.copy() if httpx_settings else {}
httpx_settings.setdefault("headers", {})
Expand Down Expand Up @@ -3444,11 +3454,14 @@ def __init__(
# client will use a standard HTTP/1.1 connection instead.
httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value())

self.server_type = (
ServerType.CLOUD
if api.startswith(PREFECT_CLOUD_API_URL.value())
else ServerType.SERVER
)
if server_type:
self.server_type = server_type
else:
self.server_type = (
ServerType.CLOUD
if api.startswith(PREFECT_CLOUD_API_URL.value())
else ServerType.SERVER
)

# Connect to an in-process application
elif isinstance(api, ASGIApp):
Expand Down Expand Up @@ -4062,6 +4075,33 @@ def read_deployment(
raise
return DeploymentResponse.model_validate(response.json())

def read_deployment_by_name(
self,
name: str,
) -> DeploymentResponse:
"""
Query the Prefect API for a deployment by name.

Args:
name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>

Raises:
prefect.exceptions.ObjectNotFound: If request returns 404
httpx.RequestError: If request fails

Returns:
a Deployment model representation of the deployment
"""
try:
response = self._client.get(f"/deployments/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

return DeploymentResponse.model_validate(response.json())

def create_artifact(
self,
artifact: ArtifactCreate,
Expand Down
11 changes: 9 additions & 2 deletions src/prefect/events/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
EventsClient,
NullEventsClient,
PrefectCloudEventsClient,
PrefectEphemeralEventsClient,
PrefectEventsClient,
)
from .related import related_resources_from_run_context
Expand Down Expand Up @@ -97,7 +96,15 @@ def instance(
elif should_emit_events_to_running_server():
client_type = PrefectEventsClient
elif should_emit_events_to_ephemeral_server():
client_type = PrefectEphemeralEventsClient
# create an ephemeral API if none was provided
from prefect.server.api.server import SubprocessASGIServer

server = SubprocessASGIServer()
server.start()
assert server.server_process is not None, "Server process did not start"

client_kwargs = {"api_url": f"{server.address()}/api"}
client_type = PrefectEventsClient
else:
client_type = NullEventsClient

Expand Down
Loading
Loading