Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blocks in deployment params #14741

Merged
merged 17 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 129 additions & 3 deletions src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import html
import inspect
import sys
import uuid
import warnings
from abc import ABC
from functools import partial
Expand Down Expand Up @@ -790,6 +791,33 @@ async def _get_block_document(

return block_document, block_document_name

@classmethod
@sync_compatible
@inject_client
async def _get_block_document_by_id(
cls,
block_document_id: Union[str, uuid.UUID],
client: Optional["PrefectClient"] = None,
):
if isinstance(block_document_id, str):
try:
block_document_id = UUID(block_document_id)
except ValueError:
raise ValueError(
f"Block document ID {block_document_id!r} is not a valid UUID"
)

try:
block_document = await client.read_block_document(
block_document_id=block_document_id
)
except prefect.exceptions.ObjectNotFound:
raise ValueError(
f"Unable to find block document with ID {block_document_id!r}"
)

return block_document, block_document.name

@classmethod
@sync_compatible
@inject_client
Expand Down Expand Up @@ -876,25 +904,123 @@ class Custom(Block):
"""
block_document, block_document_name = await cls._get_block_document(name)

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
@sync_compatible
@inject_client
async def load_from_ref(
cls,
ref: Union[str, UUID, Dict[str, Any]],
validate: bool = True,
client: Optional["PrefectClient"] = None,
) -> "Self":
"""
Retrieves data from the block document by given reference for the block type
that corresponds with the current class and returns an instantiated version of
the current class with the data stored in the block document.

Provided reference can be a block document ID, or a reference data in dictionary format.
Supported dictionary reference formats are:
- {"block_document_id": <block_document_id>}
- {"block_document_slug": <block_document_slug>}

If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.

If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.

If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.

Args:
ref: The reference to the block document. This can be a block document ID,
or one of supported dictionary reference formats.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.
client: The client to use to load the block document. If not provided, the
default client will be injected.

Raises:
ValueError: If invalid reference format is provided.
ValueError: If the requested block document is not found.

Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.

"""
block_document = None
if isinstance(ref, (str, UUID)):
block_document, _ = await cls._get_block_document_by_id(ref)
elif isinstance(ref, dict):
if block_document_id := ref.get("block_document_id"):
block_document, _ = await cls._get_block_document_by_id(
block_document_id
)
elif block_document_slug := ref.get("block_document_slug"):
block_document, _ = await cls._get_block_document(block_document_slug)

if not block_document:
raise ValueError(f"Invalid reference format {ref!r}.")

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
def _load_from_block_document(
cls, block_document: BlockDocument, validate: bool = True
) -> "Self":
"""
Loads a block from a given block document.

If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.

If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.

If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.

Args:
block_document: The block document used to instantiate a block.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.

Raises:
ValueError: If the requested block document is not found.

Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.

"""
try:
return cls._from_block_document(block_document)
except ValidationError as e:
if not validate:
missing_fields = tuple(err["loc"][0] for err in e.errors())
missing_block_data = {field: None for field in missing_fields}
warnings.warn(
f"Could not fully load {block_document_name!r} of block type"
f"Could not fully load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} - this is likely because one or more"
" required fields were added to the schema for"
f" {cls.__name__!r} that did not exist on the class when this block"
" was last saved. Please specify values for new field(s):"
f" {listrepr(missing_fields)}, then run"
f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,'
f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,'
" and load this block again before attempting to use it."
)
return cls.model_construct(**block_document.data, **missing_block_data)
raise RuntimeError(
f"Unable to load {block_document_name!r} of block type"
f"Unable to load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} due to failed validation. To load without"
" validation, try loading again with `validate=False`."
) from e
Expand Down
17 changes: 16 additions & 1 deletion src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
parameters_to_args_kwargs,
raise_for_reserved_arguments,
)
from prefect.utilities.collections import listrepr
from prefect.utilities.collections import listrepr, visit_collection
from prefect.utilities.filesystem import relative_path_to_current_platform
from prefect.utilities.hashing import file_hash
from prefect.utilities.importtools import import_object, safe_load_namespace
Expand Down Expand Up @@ -535,6 +535,21 @@ def validate_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
Raises:
ParameterTypeError: if the provided parameters are not valid
"""

def resolve_block_reference(data: Any) -> Any:
if isinstance(data, dict) and "$ref" in data:
return Block.load_from_ref(data["$ref"])
return data

try:
parameters = visit_collection(
parameters, resolve_block_reference, return_data=True
)
except (ValueError, RuntimeError) as exc:
raise ParameterTypeError(
"Failed to resolve block references in parameters."
) from exc

args, kwargs = parameters_to_args_kwargs(self.fn, parameters)

with warnings.catch_warnings():
Expand Down
30 changes: 30 additions & 0 deletions src/prefect/utilities/schema_tools/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,35 @@ def preprocess_schema(
process_properties(
definition["properties"], required_fields, allow_none_with_default
)
# Allow block types to be referenced by their id
if "block_type_slug" in definition:
schema["definitions"][definition["title"]] = {
"oneOf": [
definition,
{
"type": "object",
"properties": {
"$ref": {
"oneOf": [
{
"type": "string",
"format": "uuid",
},
{
"type": "object",
"additionalProperties": {
"type": "string",
},
"minProperties": 1,
},
]
}
},
"required": [
"$ref",
],
},
]
}

return schema
Loading
Loading