diff --git a/src/prefect/client/orchestration/_deployments/client.py b/src/prefect/client/orchestration/_deployments/client.py index dbf5b5d31536..eb808b42c19a 100644 --- a/src/prefect/client/orchestration/_deployments/client.py +++ b/src/prefect/client/orchestration/_deployments/client.py @@ -7,7 +7,6 @@ from prefect.client.orchestration.base import BaseAsyncClient, BaseClient from prefect.exceptions import ObjectNotFound -from prefect.states import Scheduled if TYPE_CHECKING: import datetime @@ -551,6 +550,7 @@ def create_flow_run_from_deployment( """ from prefect.client.schemas.actions import DeploymentFlowRunCreate from prefect.client.schemas.objects import FlowRun + from prefect.states import Scheduled parameters = parameters or {} context = context or {} @@ -1094,6 +1094,7 @@ async def create_flow_run_from_deployment( """ from prefect.client.schemas.actions import DeploymentFlowRunCreate from prefect.client.schemas.objects import FlowRun + from prefect.states import Scheduled parameters = parameters or {} context = context or {} diff --git a/src/prefect/client/orchestration/_logs/client.py b/src/prefect/client/orchestration/_logs/client.py index 3e05d980a16c..da7ac104aaf2 100644 --- a/src/prefect/client/orchestration/_logs/client.py +++ b/src/prefect/client/orchestration/_logs/client.py @@ -1,11 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Iterable, Union from prefect.client.orchestration.base import BaseAsyncClient, BaseClient -from prefect.client.schemas.sorting import ( - LogSort, -) if TYPE_CHECKING: from prefect.client.schemas.actions import ( @@ -17,6 +14,7 @@ from prefect.client.schemas.objects import ( Log, ) + from prefect.client.schemas.sorting import LogSort class LogClient(BaseClient): @@ -34,19 +32,21 @@ def create_logs(self, logs: Iterable[Union["LogCreate", dict[str, Any]]]) -> Non def read_logs( self, - log_filter: Optional["LogFilter"] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - sort: "LogSort" = LogSort.TIMESTAMP_ASC, + log_filter: "LogFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + sort: "LogSort | None" = None, ) -> list["Log"]: """ Read flow and task run logs. """ + from prefect.client.schemas.sorting import LogSort + body: dict[str, Any] = { "logs": log_filter.model_dump(mode="json") if log_filter else None, "limit": limit, "offset": offset, - "sort": sort, + "sort": sort or LogSort.TIMESTAMP_ASC, } response = self.request("POST", "/logs/filter", json=body) from prefect.client.schemas.objects import Log @@ -74,19 +74,21 @@ async def create_logs( async def read_logs( self, - log_filter: Optional["LogFilter"] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - sort: "LogSort" = LogSort.TIMESTAMP_ASC, + log_filter: "LogFilter | None" = None, + limit: int | None = None, + offset: int | None = None, + sort: "LogSort | None" = None, ) -> list[Log]: """ Read flow and task run logs. """ + from prefect.client.schemas.sorting import LogSort + body: dict[str, Any] = { "logs": log_filter.model_dump(mode="json") if log_filter else None, "limit": limit, "offset": offset, - "sort": sort, + "sort": sort or LogSort.TIMESTAMP_ASC, } response = await self.request("POST", "/logs/filter", json=body) diff --git a/src/prefect/client/schemas/__init__.py b/src/prefect/client/schemas/__init__.py index 2a35e6a1f3c0..c686cacac531 100644 --- a/src/prefect/client/schemas/__init__.py +++ b/src/prefect/client/schemas/__init__.py @@ -1,32 +1,58 @@ -# Some objects are exported here for backwards compatibility. -# In general, it is recommended to import schemas from their respective modules. +import importlib +import sys +from typing import Any, TYPE_CHECKING -from .actions import BlockTypeUpdate, StateCreate -from .objects import ( - DEFAULT_BLOCK_SCHEMA_VERSION, - BlockDocument, - BlockSchema, - BlockType, - FlowRun, - FlowRunPolicy, - State, - StateDetails, - StateType, - TaskRun, - TaskRunInput, - TaskRunPolicy, - TaskRunResult, - Workspace, -) -from .responses import ( - OrchestrationResult, - SetStateStatus, - StateAbortDetails, - StateAcceptDetails, - StateRejectDetails, -) +if TYPE_CHECKING: + from .actions import BlockTypeUpdate, StateCreate + from .objects import ( + DEFAULT_BLOCK_SCHEMA_VERSION, + BlockDocument, + BlockSchema, + BlockType, + FlowRun, + FlowRunPolicy, + State, + StateDetails, + StateType, + TaskRun, + TaskRunInput, + TaskRunPolicy, + TaskRunResult, + Workspace, + ) + from .responses import ( + OrchestrationResult, + SetStateStatus, + StateAbortDetails, + StateAcceptDetails, + StateRejectDetails, + ) -__all__ = ( +_public_api = { + "BlockDocument": (__package__, ".objects"), + "BlockSchema": (__package__, ".objects"), + "BlockType": (__package__, ".objects"), + "BlockTypeUpdate": (__package__, ".actions"), + "DEFAULT_BLOCK_SCHEMA_VERSION": (__package__, ".objects"), + "FlowRun": (__package__, ".objects"), + "FlowRunPolicy": (__package__, ".objects"), + "OrchestrationResult": (__package__, ".responses"), + "SetStateStatus": (__package__, ".responses"), + "State": (__package__, ".objects"), + "StateAbortDetails": (__package__, ".responses"), + "StateAcceptDetails": (__package__, ".responses"), + "StateCreate": (__package__, ".actions"), + "StateDetails": (__package__, ".objects"), + "StateRejectDetails": (__package__, ".responses"), + "StateType": (__package__, ".objects"), + "TaskRun": (__package__, ".objects"), + "TaskRunInput": (__package__, ".objects"), + "TaskRunPolicy": (__package__, ".objects"), + "TaskRunResult": (__package__, ".objects"), + "Workspace": (__package__, ".objects"), +} + +__all__ = [ "BlockDocument", "BlockSchema", "BlockType", @@ -48,4 +74,18 @@ "TaskRunPolicy", "TaskRunResult", "Workspace", -) +] + + +def __getattr__(attr_name: str) -> Any: + try: + if (dynamic_attr := _public_api.get(attr_name)) is None: + raise AttributeError(f"module {__name__} has no attribute {attr_name}") + + package, mname = dynamic_attr + module = importlib.import_module(mname, package=package) + return getattr(module, attr_name) + except ModuleNotFoundError as ex: + mname, _, attr = (ex.name or "").rpartition(".") + ctx = {"name": mname, "obj": attr} if sys.version_info >= (3, 10) else {} + raise AttributeError(f"module {mname} has no attribute {attr}", **ctx) from ex diff --git a/src/prefect/states.py b/src/prefect/states.py index b2f46f8a40bc..db034e92c066 100644 --- a/src/prefect/states.py +++ b/src/prefect/states.py @@ -6,7 +6,7 @@ import warnings from collections import Counter from types import GeneratorType, TracebackType -from typing import Any, Dict, Iterable, Optional, Type +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type import anyio import httpx @@ -28,16 +28,16 @@ UnfinishedRun, ) from prefect.logging.loggers import get_logger, get_run_logger -from prefect.results import ( - R, - ResultRecord, - ResultRecordMetadata, - ResultStore, -) from prefect.utilities.annotations import BaseAnnotation from prefect.utilities.asyncutils import in_async_main_thread, sync_compatible from prefect.utilities.collections import ensure_iterable +if TYPE_CHECKING: + from prefect.results import ( + R, + ResultStore, + ) + logger = get_logger("states") @@ -49,11 +49,11 @@ help="Please ensure you are awaiting the call to `result()` when calling in an async context.", ) def get_state_result( - state: State[R], + state: "State[R]", raise_on_failure: bool = True, fetch: bool = True, retry_result_failure: bool = True, -) -> R: +) -> "R": """ Get the result from a state. @@ -86,13 +86,18 @@ def get_state_result( async def _get_state_result_data_with_retries( - state: State[R], retry_result_failure: bool = True -) -> R: + state: "State[R]", retry_result_failure: bool = True +) -> "R": # Results may be written asynchronously, possibly after their corresponding # state has been written and events have been emitted, so we should give some # grace here about missing results. The exception below could come in the form # of a missing file, a short read, or other types of errors depending on the # result storage backend. + from prefect.results import ( + ResultRecord, + ResultRecordMetadata, + ) + if retry_result_failure is False: max_attempts = 1 else: @@ -120,11 +125,16 @@ async def _get_state_result_data_with_retries( @sync_compatible async def _get_state_result( - state: State[R], raise_on_failure: bool, retry_result_failure: bool = True -) -> R: + state: "State[R]", raise_on_failure: bool, retry_result_failure: bool = True +) -> "R": """ Internal implementation for `get_state_result` without async backwards compatibility """ + from prefect.results import ( + ResultRecord, + ResultRecordMetadata, + ) + if state.is_paused(): # Paused states are not truly terminal and do not have results associated with them raise PausedRun("Run is paused, its result is not available.", state=state) @@ -181,7 +191,7 @@ def format_exception(exc: BaseException, tb: TracebackType = None) -> str: async def exception_to_crashed_state( exc: BaseException, - result_store: Optional[ResultStore] = None, + result_store: Optional["ResultStore"] = None, ) -> State: """ Takes an exception that occurs _outside_ of user code and converts it to a @@ -233,7 +243,7 @@ async def exception_to_crashed_state( async def exception_to_failed_state( exc: Optional[BaseException] = None, - result_store: Optional[ResultStore] = None, + result_store: Optional["ResultStore"] = None, write_result: bool = False, **kwargs, ) -> State: @@ -285,12 +295,12 @@ async def exception_to_failed_state( async def return_value_to_state( - retval: R, - result_store: ResultStore, + retval: "R", + result_store: "ResultStore", key: Optional[str] = None, expiration: Optional[datetime.datetime] = None, write_result: bool = False, -) -> State[R]: +) -> "State[R]": """ Given a return value from a user's function, create a `State` the run should be placed in. @@ -311,6 +321,11 @@ async def return_value_to_state( Callers should resolve all futures into states before passing return values to this function. """ + from prefect.results import ( + ResultRecord, + ResultRecordMetadata, + ) + try: local_logger = get_run_logger() except MissingContextError: @@ -443,6 +458,10 @@ async def get_state_exception(state: State) -> BaseException: - `CrashedRun` if the state type is CRASHED. - `CancelledRun` if the state type is CANCELLED. """ + from prefect.results import ( + ResultRecord, + ResultRecordMetadata, + ) if state.is_failed(): wrapper = FailedRun @@ -586,7 +605,7 @@ def __repr__(self) -> str: return f"StateGroup<{self.counts_message()}>" -def _traced(cls: Type[State[R]], **kwargs: Any) -> State[R]: +def _traced(cls: Type["State[R]"], **kwargs: Any) -> "State[R]": state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) carrier = {} @@ -597,10 +616,10 @@ def _traced(cls: Type[State[R]], **kwargs: Any) -> State[R]: def Scheduled( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, scheduled_time: Optional[datetime.datetime] = None, **kwargs: Any, -) -> State[R]: +) -> "State[R]": """Convenience function for creating `Scheduled` states. Returns: @@ -616,7 +635,7 @@ def Scheduled( return _traced(cls, type=StateType.SCHEDULED, state_details=state_details, **kwargs) -def Completed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Completed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Completed` states. Returns: @@ -626,7 +645,7 @@ def Completed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.COMPLETED, **kwargs) -def Running(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Running(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Running` states. Returns: @@ -635,7 +654,7 @@ def Running(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.RUNNING, **kwargs) -def Failed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Failed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Failed` states. Returns: @@ -644,7 +663,7 @@ def Failed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.FAILED, **kwargs) -def Crashed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Crashed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Crashed` states. Returns: @@ -653,7 +672,7 @@ def Crashed(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.CRASHED, **kwargs) -def Cancelling(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Cancelling(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Cancelling` states. Returns: @@ -662,7 +681,7 @@ def Cancelling(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.CANCELLING, **kwargs) -def Cancelled(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Cancelled(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Cancelled` states. Returns: @@ -671,7 +690,7 @@ def Cancelled(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: return _traced(cls, type=StateType.CANCELLED, **kwargs) -def Pending(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Pending(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Pending` states. Returns: @@ -681,13 +700,13 @@ def Pending(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: def Paused( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, timeout_seconds: Optional[int] = None, pause_expiration_time: Optional[datetime.datetime] = None, reschedule: bool = False, pause_key: Optional[str] = None, **kwargs: Any, -) -> State[R]: +) -> "State[R]": """Convenience function for creating `Paused` states. Returns: @@ -717,7 +736,7 @@ def Paused( def Suspended( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, timeout_seconds: Optional[int] = None, pause_expiration_time: Optional[datetime.datetime] = None, pause_key: Optional[str] = None, @@ -740,10 +759,10 @@ def Suspended( def AwaitingRetry( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, scheduled_time: Optional[datetime.datetime] = None, **kwargs: Any, -) -> State[R]: +) -> "State[R]": """Convenience function for creating `AwaitingRetry` states. Returns: @@ -755,10 +774,10 @@ def AwaitingRetry( def AwaitingConcurrencySlot( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, scheduled_time: Optional[datetime.datetime] = None, **kwargs: Any, -) -> State[R]: +) -> "State[R]": """Convenience function for creating `AwaitingConcurrencySlot` states. Returns: @@ -769,7 +788,7 @@ def AwaitingConcurrencySlot( ) -def Retrying(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: +def Retrying(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": """Convenience function for creating `Retrying` states. Returns: @@ -779,10 +798,10 @@ def Retrying(cls: Type[State[R]] = State, **kwargs: Any) -> State[R]: def Late( - cls: Type[State[R]] = State, + cls: Type["State[R]"] = State, scheduled_time: Optional[datetime.datetime] = None, **kwargs: Any, -) -> State[R]: +) -> "State[R]": """Convenience function for creating `Late` states. Returns: