Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/npm_and_yarn/ui/vitejs/plugin-vue…
Browse files Browse the repository at this point in the history
…-5.1.1
  • Loading branch information
zhen0 authored Jul 29, 2024
2 parents ce1c322 + ff92adc commit a174648
Show file tree
Hide file tree
Showing 38 changed files with 1,987 additions and 205 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codspeed-benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
# https://github.com/PrefectHQ/prefect/issues/6990
- name: Run benchmarks
uses: CodSpeedHQ/action@v2
uses: CodSpeedHQ/action@v3
env:
PREFECT_API_URL: "http://127.0.0.1:4200/api"
with:
Expand Down
38 changes: 20 additions & 18 deletions benches/bench_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@
import sys

import pytest
from prometheus_client import REGISTRY
from pytest_benchmark.fixture import BenchmarkFixture


def reset_imports():
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()

# reset the prometheus registry to clear any previously measured metrics
for collector in list(REGISTRY._collector_to_names):
REGISTRY.unregister(collector)


@pytest.mark.benchmark(group="imports")
def bench_import_prefect(benchmark):
def bench_import_prefect(benchmark: BenchmarkFixture):
def import_prefect():
# To get an accurate result, we want to import the module from scratch each time
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()
reset_imports()

import prefect # noqa

Expand All @@ -23,16 +32,9 @@ def import_prefect():

@pytest.mark.timeout(180)
@pytest.mark.benchmark(group="imports")
def bench_import_prefect_flow(benchmark):
def bench_import_prefect_flow(benchmark: BenchmarkFixture):
def import_prefect_flow():
# To get an accurate result, we want to import the module from scratch each time
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()
reset_imports()

from prefect import flow # noqa

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
openapi: get /api/flow_runs/{id}/logs
---
68 changes: 68 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,59 @@
}
}
},
"/api/flow_runs/{id}/download-logs-csv": {
"get": {
"tags": [
"Flow Runs"
],
"summary": "Download Logs",
"description": "Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.",
"operationId": "download_logs_flow_runs__id__download_logs_csv_get",
"parameters": [
{
"name": "id",
"in": "path",
"required": true,
"schema": {
"type": "string",
"format": "uuid",
"description": "The flow run id",
"title": "Id"
},
"description": "The flow run id"
},
{
"name": "x-prefect-api-version",
"in": "header",
"required": false,
"schema": {
"type": "string",
"title": "X-Prefect-Api-Version"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/api/task_runs/": {
"post": {
"tags": [
Expand Down Expand Up @@ -22522,6 +22575,21 @@
"exclusiveMinimum": 0.0,
"title": "Prefect Events Websocket Backfill Page Size",
"default": 250
},
"PREFECT_API_ENABLE_METRICS": {
"type": "boolean",
"title": "Prefect Api Enable Metrics",
"default": false
},
"PREFECT_CLIENT_ENABLE_METRICS": {
"type": "boolean",
"title": "Prefect Client Enable Metrics",
"default": false
},
"PREFECT_CLIENT_METRICS_PORT": {
"type": "integer",
"title": "Prefect Client Metrics Port",
"default": 4201
}
},
"type": "object",
Expand Down
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@
"3.0rc/api-ref/rest-api/server/flow-runs/read-flow-run-input",
"3.0rc/api-ref/rest-api/server/flow-runs/delete-flow-run-input",
"3.0rc/api-ref/rest-api/server/flow-runs/paginate-flow-runs",
"3.0rc/api-ref/rest-api/server/flow-runs/download-logs",
"3.0rc/api-ref/rest-api/server/flow-runs/read-flow-run-history",
"3.0rc/api-ref/rest-api/server/flow-runs/count-task-runs-by-flow-run"
]
Expand Down
1 change: 1 addition & 0 deletions requirements-client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ orjson >= 3.7, < 4.0
packaging >= 21.3, < 24.3
pathspec >= 0.8.0
pendulum >= 3.0.0, <4
prometheus-client >= 0.20.0
pydantic >= 2.7, < 3.0.0
pydantic_core >= 2.12.0, < 3.0.0
pydantic_extra_types >= 2.8.2, < 3.0.0
Expand Down
132 changes: 129 additions & 3 deletions src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import html
import inspect
import sys
import uuid
import warnings
from abc import ABC
from functools import partial
Expand Down Expand Up @@ -790,6 +791,33 @@ async def _get_block_document(

return block_document, block_document_name

@classmethod
@sync_compatible
@inject_client
async def _get_block_document_by_id(
cls,
block_document_id: Union[str, uuid.UUID],
client: Optional["PrefectClient"] = None,
):
if isinstance(block_document_id, str):
try:
block_document_id = UUID(block_document_id)
except ValueError:
raise ValueError(
f"Block document ID {block_document_id!r} is not a valid UUID"
)

try:
block_document = await client.read_block_document(
block_document_id=block_document_id
)
except prefect.exceptions.ObjectNotFound:
raise ValueError(
f"Unable to find block document with ID {block_document_id!r}"
)

return block_document, block_document.name

@classmethod
@sync_compatible
@inject_client
Expand Down Expand Up @@ -876,25 +904,123 @@ class Custom(Block):
"""
block_document, block_document_name = await cls._get_block_document(name)

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
@sync_compatible
@inject_client
async def load_from_ref(
cls,
ref: Union[str, UUID, Dict[str, Any]],
validate: bool = True,
client: Optional["PrefectClient"] = None,
) -> "Self":
"""
Retrieves data from the block document by given reference for the block type
that corresponds with the current class and returns an instantiated version of
the current class with the data stored in the block document.
Provided reference can be a block document ID, or a reference data in dictionary format.
Supported dictionary reference formats are:
- {"block_document_id": <block_document_id>}
- {"block_document_slug": <block_document_slug>}
If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.
If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.
If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.
Args:
ref: The reference to the block document. This can be a block document ID,
or one of supported dictionary reference formats.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.
client: The client to use to load the block document. If not provided, the
default client will be injected.
Raises:
ValueError: If invalid reference format is provided.
ValueError: If the requested block document is not found.
Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.
"""
block_document = None
if isinstance(ref, (str, UUID)):
block_document, _ = await cls._get_block_document_by_id(ref)
elif isinstance(ref, dict):
if block_document_id := ref.get("block_document_id"):
block_document, _ = await cls._get_block_document_by_id(
block_document_id
)
elif block_document_slug := ref.get("block_document_slug"):
block_document, _ = await cls._get_block_document(block_document_slug)

if not block_document:
raise ValueError(f"Invalid reference format {ref!r}.")

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
def _load_from_block_document(
cls, block_document: BlockDocument, validate: bool = True
) -> "Self":
"""
Loads a block from a given block document.
If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.
If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.
If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.
Args:
block_document: The block document used to instantiate a block.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.
Raises:
ValueError: If the requested block document is not found.
Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.
"""
try:
return cls._from_block_document(block_document)
except ValidationError as e:
if not validate:
missing_fields = tuple(err["loc"][0] for err in e.errors())
missing_block_data = {field: None for field in missing_fields}
warnings.warn(
f"Could not fully load {block_document_name!r} of block type"
f"Could not fully load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} - this is likely because one or more"
" required fields were added to the schema for"
f" {cls.__name__!r} that did not exist on the class when this block"
" was last saved. Please specify values for new field(s):"
f" {listrepr(missing_fields)}, then run"
f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,'
f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,'
" and load this block again before attempting to use it."
)
return cls.model_construct(**block_document.data, **missing_block_data)
raise RuntimeError(
f"Unable to load {block_document_name!r} of block type"
f"Unable to load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} due to failed validation. To load without"
" validation, try loading again with `validate=False`."
) from e
Expand Down
16 changes: 6 additions & 10 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
from prefect.deployments.steps.core import run_steps
from prefect.events import DeploymentTriggerTypes, TriggerTypes
from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError
from prefect.flows import load_flow_arguments_from_entrypoint
from prefect.flows import load_flow_from_entrypoint
from prefect.settings import (
PREFECT_DEFAULT_WORK_POOL_NAME,
PREFECT_UI_URL,
)
from prefect.utilities.annotations import NotSet
from prefect.utilities.callables import (
parameter_schema_from_entrypoint,
parameter_schema,
)
from prefect.utilities.collections import get_from_dict
from prefect.utilities.slugify import slugify
Expand Down Expand Up @@ -471,21 +471,17 @@ async def _run_single_deploy(
)
deploy_config["entrypoint"] = await prompt_entrypoint(app.console)

flow_decorator_arguments = load_flow_arguments_from_entrypoint(
deploy_config["entrypoint"], arguments={"name", "description"}
)
flow = load_flow_from_entrypoint(deploy_config["entrypoint"])

deploy_config["flow_name"] = flow_decorator_arguments["name"]
deploy_config["flow_name"] = flow.name

deployment_name = deploy_config.get("name")
if not deployment_name:
if not is_interactive():
raise ValueError("A deployment name must be provided.")
deploy_config["name"] = prompt("Deployment name", default="default")

deploy_config["parameter_openapi_schema"] = parameter_schema_from_entrypoint(
deploy_config["entrypoint"]
)
deploy_config["parameter_openapi_schema"] = parameter_schema(flow)

deploy_config["schedules"] = _construct_schedules(
deploy_config,
Expand Down Expand Up @@ -654,7 +650,7 @@ async def _run_single_deploy(
deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}"

if not deploy_config.get("description"):
deploy_config["description"] = flow_decorator_arguments.get("description")
deploy_config["description"] = flow.description

# save deploy_config before templating
deploy_config_before_templating = deepcopy(deploy_config)
Expand Down
Loading

0 comments on commit a174648

Please sign in to comment.