Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blocks in deployment params #14741

Merged
merged 17 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import html
import inspect
import sys
import uuid
import warnings
from abc import ABC
from functools import partial
Expand Down Expand Up @@ -272,6 +273,19 @@ class Block(BaseModel, ABC):
)

def __init__(self, *args, **kwargs):
block_document_id = kwargs.pop("$ref", None)
if block_document_id:
block_document, block_document_name = self._get_block_document_by_id(
GalLadislav marked this conversation as resolved.
Show resolved Hide resolved
block_document_id, _sync=True
)
cicdw marked this conversation as resolved.
Show resolved Hide resolved

self._validate_block_reference(block_document)

kwargs = {
**block_document.data,
**kwargs,
}

super().__init__(*args, **kwargs)
self.block_initialization()

Expand All @@ -285,6 +299,35 @@ def __repr_args__(self):
(key, value) for key, value in repr_args if key is None or key in data_keys
]

def _validate_block_reference(self, block_document: BlockDocument) -> None:
"""
Validates that the provided block document matches the block schema of the current block.

Args:
block_document: The referenced block document to validate.

Raises:
TypeError: If the block instantiation is attempted from the base Block class.
ValueError: If the block reference type or slug is invalid.
"""
if self.__class__ == Block:
raise TypeError(
"Block class cannot be instantiated directly from block reference."
)
block_type_name = self.get_block_type_name()
block_type_slug = self.get_block_type_slug()

if block_document.block_type_name != block_type_name:
raise ValueError(
f"Invalid Block reference type {block_document.block_type_name!r} "
f"for block type {block_type_name!r} initialization"
)
if block_document.block_type.slug != block_type_slug:
raise ValueError(
f"Invalid Block reference slug {block_document.block_type.slug!r} "
f"for block slug {block_type_slug!r} initialization"
)

def block_initialization(self) -> None:
pass

Expand Down Expand Up @@ -790,6 +833,33 @@ async def _get_block_document(

return block_document, block_document_name

@classmethod
@sync_compatible
@inject_client
async def _get_block_document_by_id(
cls,
block_document_id: Union[str, uuid.UUID],
client: Optional["PrefectClient"] = None,
):
if isinstance(block_document_id, str):
try:
block_document_id = UUID(block_document_id)
except ValueError:
raise ValueError(
f"Block document ID {block_document_id!r} is not a valid UUID"
)

try:
block_document = await client.read_block_document(
block_document_id=block_document_id
)
except prefect.exceptions.ObjectNotFound:
raise ValueError(
f"Unable to find block document with ID {block_document_id!r}"
)

return block_document, block_document.name

@classmethod
@sync_compatible
@inject_client
Expand Down
19 changes: 19 additions & 0 deletions src/prefect/utilities/schema_tools/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,24 @@ def preprocess_schema(
process_properties(
definition["properties"], required_fields, allow_none_with_default
)
# Allow block types to be referenced by their id
if "block_type_slug" in definition:
schema["definitions"][definition["title"]] = {
"oneOf": [
definition,
{
"type": "object",
"properties": {
"$ref": {
"type": "string",
"format": "uuid",
},
},
"required": [
"$ref",
],
},
]
}
desertaxle marked this conversation as resolved.
Show resolved Hide resolved

return schema
131 changes: 131 additions & 0 deletions tests/blocks/test_block_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import warnings
from typing import Dict, Type
from uuid import uuid4

import pytest
from pydantic import ValidationError

from prefect.blocks.core import Block
from prefect.exceptions import ParameterTypeError
from prefect.flows import flow


class TestBlockReference:
class ReferencedBlock(Block):
a: int
b: str

class OtherReferencedBlock(Block):
a: int
b: str

@pytest.fixture
def block_reference(self, prefect_client) -> Dict[str, str]:
block = self.ReferencedBlock(a=1, b="foo")
block.save("block-reference", client=prefect_client)
return {"$ref": str(block._block_document_id)}

def test_block_initialization_from_reference(
self,
block_reference: Dict[str, str],
):
block = self.ReferencedBlock(**block_reference)
assert block.a == 1
assert block.b == "foo"

def test_block_initialization_from_reference_with_kwargs(
self,
block_reference: Dict[str, str],
):
block = self.ReferencedBlock(**block_reference, a=2)
assert block.a == 2
assert block.b == "foo"

def test_block_initialization_from_bad_reference(self):
with pytest.raises(ValueError, match="is not a valid UUID"):
self.ReferencedBlock(**{"$ref": "non-valid-uuid"})

with pytest.raises(ValueError, match="Unable to find block document with ID"):
self.ReferencedBlock(**{"$ref": str(uuid4())})

def test_block_initialization_from_invalid_block_reference_type(self):
block = self.OtherReferencedBlock(a=1, b="foo")
block.save("other-block")

with pytest.raises(ValueError, match="Invalid Block reference type"):
self.ReferencedBlock(**{"$ref": str(block._block_document_id)})

def test_block_validation_from_reference(
self,
block_reference: Dict[str, str],
):
block = self.ReferencedBlock.model_validate(block_reference)
assert block.a == 1
assert block.b == "foo"

def test_block_validation_from_bad_reference(
self,
block_reference: Dict[str, str],
):
with pytest.raises(ValidationError):
self.ReferencedBlock.model_validate({"$ref": "non-valid-uuid"})

with pytest.raises(ValidationError):
self.ReferencedBlock.model_validate({"$ref": str(uuid4())})

def test_block_validation_from_invalid_block_reference_type(self):
block = self.OtherReferencedBlock(a=1, b="foo")
block.save("other-block")

with pytest.raises(ValidationError):
self.ReferencedBlock.model_validate({"$ref": str(block._block_document_id)})


class TestFlowWithBlockParam:
@pytest.fixture
def ParamBlock(self) -> Type[Block]:
# Ignore warning caused by matching key in registry due to block fixture
warnings.filterwarnings("ignore", category=UserWarning)

class ParamBlock(Block):
a: int
b: str

return ParamBlock

@pytest.fixture
def OtherParamBlock(self) -> Type[Block]:
# Ignore warning caused by matching key in registry due to block fixture
warnings.filterwarnings("ignore", category=UserWarning)

class OtherParamBlock(Block):
a: int
b: str

return OtherParamBlock

def test_flow_with_block_params(self, ParamBlock):
ref_block = ParamBlock(a=10, b="foo")
ref_block.save("param-block")

@flow
def flow_with_block_param(block: ParamBlock) -> int:
return block.a

assert (
flow_with_block_param({"$ref": str(ref_block._block_document_id)})
== ref_block.a
)

def test_flow_with_invalid_block_param_type(self, ParamBlock, OtherParamBlock):
ref_block = OtherParamBlock(a=10, b="foo")
ref_block.save("other-param-block")

@flow
def flow_with_block_param(block: ParamBlock) -> int:
return block.a

with pytest.raises(
ParameterTypeError, match="Flow run received invalid parameters"
):
flow_with_block_param({"$ref": str(ref_block._block_document_id)})
Loading