Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Dec 9, 2024
1 parent 9463bd1 commit a8c81e0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 64 deletions.
142 changes: 82 additions & 60 deletions src/prefect/client/_adapters.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,90 @@
"""Creating type adapters to avoid recreation of the pydantic core schemas when possible."""

from typing import List
from typing import TYPE_CHECKING, List

from pydantic import TypeAdapter
from pydantic import ConfigDict, TypeAdapter

from prefect.client.schemas.objects import (
Artifact,
ArtifactCollection,
BlockDocument,
BlockSchema,
BlockType,
ConcurrencyLimit,
DeploymentSchedule,
Flow,
FlowRun,
FlowRunInput,
FlowRunNotificationPolicy,
GlobalConcurrencyLimit,
Log,
State,
TaskRun,
Variable,
Worker,
WorkPool,
WorkQueue,
if TYPE_CHECKING:
from prefect.client.schemas.objects import (
Artifact,
ArtifactCollection,
BlockDocument,
BlockSchema,
BlockType,
ConcurrencyLimit,
DeploymentSchedule,
Flow,
FlowRun,
FlowRunInput,
FlowRunNotificationPolicy,
GlobalConcurrencyLimit,
Log,
State,
TaskRun,
Variable,
Worker,
WorkPool,
WorkQueue,
)
from prefect.client.schemas.responses import (
DeploymentResponse,
FlowRunResponse,
GlobalConcurrencyLimitResponse,
WorkerFlowRunResponse,
)
from prefect.events.schemas.automations import Automation


defer_build_cfg = ConfigDict(defer_build=True)

BlockTypeAdapter = TypeAdapter("BlockType", config=defer_build_cfg)
BlockSchemaAdapter = TypeAdapter(List["BlockSchema"], config=defer_build_cfg)
ConcurrencyLimitAdapter = TypeAdapter("ConcurrencyLimit", config=defer_build_cfg)
ConcurrencyLimitListAdapter = TypeAdapter(
List["ConcurrencyLimit"], config=defer_build_cfg
)
from prefect.client.schemas.responses import (
DeploymentResponse,
FlowRunResponse,
GlobalConcurrencyLimitResponse,
WorkerFlowRunResponse,
FlowAdapter = TypeAdapter("Flow", config=defer_build_cfg)
FlowRunAdapter = TypeAdapter("FlowRun", config=defer_build_cfg)
ArtifactListAdapter = TypeAdapter(List["Artifact"], config=defer_build_cfg)
ArtifactCollectionListAdapter = TypeAdapter(
List["ArtifactCollection"], config=defer_build_cfg
)
AutomationListAdapter = TypeAdapter(List["Automation"], config=defer_build_cfg)
BlockDocumentListAdapter = TypeAdapter(List["BlockDocument"], config=defer_build_cfg)
BlockSchemaListAdapter = TypeAdapter(List["BlockSchema"], config=defer_build_cfg)
BlockTypeListAdapter = TypeAdapter(List["BlockType"], config=defer_build_cfg)
ConcurrencyLimitListAdapter = TypeAdapter(
List["ConcurrencyLimit"], config=defer_build_cfg
)
DeploymentResponseListAdapter = TypeAdapter(
List["DeploymentResponse"], config=defer_build_cfg
)
DeploymentScheduleListAdapter = TypeAdapter(
List["DeploymentSchedule"], config=defer_build_cfg
)
FlowListAdapter = TypeAdapter(List["Flow"], config=defer_build_cfg)
FlowRunListAdapter = TypeAdapter(List["FlowRun"], config=defer_build_cfg)
FlowRunInputListAdapter = TypeAdapter(List["FlowRunInput"], config=defer_build_cfg)
FlowRunNotificationPolicyListAdapter = TypeAdapter(
List["FlowRunNotificationPolicy"], config=defer_build_cfg
)
FlowRunResponseListAdapter = TypeAdapter(
List["FlowRunResponse"], config=defer_build_cfg
)
GlobalConcurrencyLimitListAdapter = TypeAdapter(
List["GlobalConcurrencyLimit"], config=defer_build_cfg
)
from prefect.events.schemas.automations import Automation

BlockTypeAdapter = TypeAdapter(BlockType)
BlockSchemaAdapter = TypeAdapter(List[BlockSchema])
ConcurrencyLimitAdapter = TypeAdapter(ConcurrencyLimit)
ConcurrencyLimitListAdapter = TypeAdapter(List[ConcurrencyLimit])
FlowAdapter = TypeAdapter(Flow)
FlowRunAdapter = TypeAdapter(FlowRun)
ArtifactListAdapter = TypeAdapter(List[Artifact])
ArtifactCollectionListAdapter = TypeAdapter(List[ArtifactCollection])
AutomationListAdapter = TypeAdapter(List[Automation])
BlockDocumentListAdapter = TypeAdapter(List[BlockDocument])
BlockSchemaListAdapter = TypeAdapter(List[BlockSchema])
BlockTypeListAdapter = TypeAdapter(List[BlockType])
ConcurrencyLimitListAdapter = TypeAdapter(List[ConcurrencyLimit])
DeploymentResponseListAdapter = TypeAdapter(List[DeploymentResponse])
DeploymentScheduleListAdapter = TypeAdapter(List[DeploymentSchedule])
FlowListAdapter = TypeAdapter(List[Flow])
FlowRunListAdapter = TypeAdapter(List[FlowRun])
FlowRunInputListAdapter = TypeAdapter(List[FlowRunInput])
FlowRunNotificationPolicyListAdapter = TypeAdapter(List[FlowRunNotificationPolicy])
FlowRunResponseListAdapter = TypeAdapter(List[FlowRunResponse])
GlobalConcurrencyLimitListAdapter = TypeAdapter(List[GlobalConcurrencyLimit])
GlobalConcurrencyLimitResponseListAdapter = TypeAdapter(
List[GlobalConcurrencyLimitResponse]
List["GlobalConcurrencyLimitResponse"], config=defer_build_cfg
)
LogListAdapter = TypeAdapter(List["Log"], config=defer_build_cfg)
StateListAdapter = TypeAdapter(List["State"], config=defer_build_cfg)
TaskRunListAdapter = TypeAdapter(List["TaskRun"], config=defer_build_cfg)
WorkerListAdapter = TypeAdapter(List["Worker"], config=defer_build_cfg)
WorkPoolListAdapter = TypeAdapter(List["WorkPool"], config=defer_build_cfg)
WorkQueueListAdapter = TypeAdapter(List["WorkQueue"], config=defer_build_cfg)
WorkerFlowRunResponseListAdapter = TypeAdapter(
List["WorkerFlowRunResponse"], config=defer_build_cfg
)
LogListAdapter = TypeAdapter(List[Log])
StateListAdapter = TypeAdapter(List[State])
TaskRunListAdapter = TypeAdapter(List[TaskRun])
WorkerListAdapter = TypeAdapter(List[Worker])
WorkPoolListAdapter = TypeAdapter(List[WorkPool])
WorkQueueListAdapter = TypeAdapter(List[WorkQueue])
WorkerFlowRunResponseListAdapter = TypeAdapter(List[WorkerFlowRunResponse])
VariableListAdapter = TypeAdapter(List[Variable])
StateAdapter = TypeAdapter(State)
VariableListAdapter = TypeAdapter(List["Variable"], config=defer_build_cfg)
StateAdapter = TypeAdapter("State", config=defer_build_cfg)
1 change: 1 addition & 0 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,7 @@ async def read_flow_run_states(
response = await self._client.get(
"/flow_run_states/", params=dict(flow_run_id=str(flow_run_id))
)
StateListAdapter.rebuild()
return StateListAdapter.validate_python(response.json())

async def set_flow_run_name(self, flow_run_id: UUID, name: str) -> httpx.Response:
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class State(ObjectBaseModel, Generic[R]):

type: StateType
name: Optional[str] = Field(default=None)
timestamp: DateTime = Field(default_factory=lambda: pendulum.now("UTC"))
timestamp: DateTime = Field(default_factory=lambda: DateTime.now("UTC"))
message: Optional[str] = Field(default=None, examples=["Run started"])
state_details: StateDetails = Field(default_factory=StateDetails)
data: Annotated[
Expand Down Expand Up @@ -429,9 +429,9 @@ def fresh_copy(self, **kwargs: Any) -> Self:
return self.model_copy(
update={
"id": uuid4(),
"created": pendulum.now("utc"),
"updated": pendulum.now("utc"),
"timestamp": pendulum.now("utc"),
"created": DateTime.now("utc"),
"updated": DateTime.now("utc"),
"timestamp": DateTime.now("utc"),
},
**kwargs,
)
Expand Down

0 comments on commit a8c81e0

Please sign in to comment.