Skip to content

Commit

Permalink
[MAINTENANCE] Check that BatchDefinitions and ExpectationSuites a…
Browse files Browse the repository at this point in the history
…re up-to-date before saving (#10277)
  • Loading branch information
cdkini authored Sep 6, 2024
1 parent 7f4cd71 commit 7021e1a
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 130 deletions.
12 changes: 6 additions & 6 deletions great_expectations/checkpoint/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
root_validator,
validator,
)
from great_expectations.core.added_diagnostics import CheckpointAddedDiagnostics
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult, # noqa: TCH001
)
from great_expectations.core.freshness_diagnostics import CheckpointFreshnessDiagnostics
from great_expectations.core.result_format import DEFAULT_RESULT_FORMAT, ResultFormatUnion
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.serdes import _IdentifierBundle
Expand Down Expand Up @@ -165,8 +165,8 @@ def run(
if not self.validation_definitions:
raise CheckpointRunWithoutValidationDefinitionError()

diagnostics = self.is_added()
if not diagnostics.is_added:
diagnostics = self.is_fresh()
if not diagnostics.success:
# The checkpoint itself is not added but all children are - we can add it for the user
if not diagnostics.parent_added and diagnostics.children_added:
self._add_to_store()
Expand Down Expand Up @@ -269,11 +269,11 @@ def _sort_actions(self) -> List[CheckpointAction]:

return priority_actions + secondary_actions

def is_added(self) -> CheckpointAddedDiagnostics:
checkpoint_diagnostics = CheckpointAddedDiagnostics(
def is_fresh(self) -> CheckpointFreshnessDiagnostics:
checkpoint_diagnostics = CheckpointFreshnessDiagnostics(
errors=[] if self.id else [CheckpointNotAddedError(name=self.name)]
)
validation_definition_diagnostics = [vd.is_added() for vd in self.validation_definitions]
validation_definition_diagnostics = [vd.is_fresh() for vd in self.validation_definitions]
checkpoint_diagnostics.update_with_children(*validation_definition_diagnostics)

return checkpoint_diagnostics
Expand Down
65 changes: 59 additions & 6 deletions great_expectations/core/batch_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@

# if we move this import into the TYPE_CHECKING block, we need to provide the
# Partitioner class when we update forward refs, so we just import here.
from great_expectations.core.added_diagnostics import (
BatchDefinitionAddedDiagnostics,
from great_expectations.core.freshness_diagnostics import (
BatchDefinitionFreshnessDiagnostics,
)
from great_expectations.core.partitioners import ColumnPartitioner, FileNamePartitioner
from great_expectations.core.serdes import _EncodedValidationData, _IdentifierBundle
from great_expectations.data_context.data_context.context_factory import project_manager
from great_expectations.exceptions.exceptions import (
BatchDefinitionNotAddedError,
BatchDefinitionNotFoundError,
BatchDefinitionNotFreshError,
DataAssetNotFoundError,
DatasourceNotFoundError,
)

if TYPE_CHECKING:
from great_expectations.datasource.fluent.batch_request import (
BatchParameters,
BatchRequest,
)
from great_expectations.datasource.fluent.interfaces import Batch, DataAsset
from great_expectations.datasource.fluent.interfaces import Batch, DataAsset, Datasource

# Depending on the Asset
PartitionerT = TypeVar("PartitionerT", ColumnPartitioner, FileNamePartitioner, None)
Expand Down Expand Up @@ -76,14 +81,62 @@ def get_batch(self, batch_parameters: Optional[BatchParameters] = None) -> Batch
batch_request = self.build_batch_request(batch_parameters=batch_parameters)
return self.data_asset.get_batch(batch_request)

def is_added(self) -> BatchDefinitionAddedDiagnostics:
return BatchDefinitionAddedDiagnostics(
def is_fresh(self) -> BatchDefinitionFreshnessDiagnostics:
diagnostics = self._is_added()
if not diagnostics.success:
return diagnostics
return self._is_fresh()

def _is_added(self) -> BatchDefinitionFreshnessDiagnostics:
return BatchDefinitionFreshnessDiagnostics(
errors=[] if self.id else [BatchDefinitionNotAddedError(name=self.name)]
)

def _is_fresh(self) -> BatchDefinitionFreshnessDiagnostics:
datasource_dict = project_manager.get_datasources()

datasource: Datasource | None
try:
datasource = datasource_dict[self.data_asset.datasource.name]
except KeyError:
datasource = None
if not datasource:
return BatchDefinitionFreshnessDiagnostics(
errors=[
DatasourceNotFoundError(
f"Could not find datasource '{self.data_asset.datasource.name}'"
)
]
)

try:
asset = datasource.get_asset(self.data_asset.name)
except LookupError:
asset = None
if not asset:
return BatchDefinitionFreshnessDiagnostics(
errors=[DataAssetNotFoundError(f"Could not find asset '{self.data_asset.name}'")]
)

batch_def: BatchDefinition | None
try:
batch_def = asset.get_batch_definition(self.name)
except KeyError:
batch_def = None
if not batch_def:
return BatchDefinitionFreshnessDiagnostics(
errors=[
BatchDefinitionNotFoundError(f"Could not find batch definition '{self.name}'")
]
)

return BatchDefinitionFreshnessDiagnostics(
errors=[] if self == batch_def else [BatchDefinitionNotFreshError(name=self.name)]
)

def identifier_bundle(self) -> _EncodedValidationData:
# Utilized as a custom json_encoder
diagnostics = self.is_added()
diagnostics = self.is_fresh()
diagnostics.raise_for_error()

asset = self.data_asset
Expand Down
52 changes: 46 additions & 6 deletions great_expectations/core/expectation_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@
ExpectationSuiteExpectationDeletedEvent,
ExpectationSuiteExpectationUpdatedEvent,
)
from great_expectations.compatibility.pydantic import ValidationError as PydanticValidationError
from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.added_diagnostics import (
ExpectationSuiteAddedDiagnostics,
from great_expectations.core.freshness_diagnostics import (
ExpectationSuiteFreshnessDiagnostics,
)
from great_expectations.core.serdes import _IdentifierBundle
from great_expectations.data_context.data_context.context_factory import project_manager
from great_expectations.exceptions.exceptions import (
ExpectationSuiteError,
ExpectationSuiteNotAddedError,
ExpectationSuiteNotFoundError,
ExpectationSuiteNotFreshError,
StoreBackendError,
)
from great_expectations.types import SerializableDictDot
from great_expectations.util import (
Expand All @@ -45,6 +50,7 @@

if TYPE_CHECKING:
from great_expectations.alias_types import JSONValues
from great_expectations.data_context.store.expectations_store import ExpectationsStore
from great_expectations.expectations.expectation import Expectation
from great_expectations.expectations.expectation_configuration import (
ExpectationConfiguration,
Expand Down Expand Up @@ -105,7 +111,9 @@ def __init__( # noqa: PLR0913
self.meta = meta
self.notes = notes

self._store = project_manager.get_expectations_store()
@property
def _store(self) -> ExpectationsStore:
return project_manager.get_expectations_store()

@property
def _include_rendered_content(self) -> bool:
Expand Down Expand Up @@ -248,11 +256,43 @@ def save(self) -> None:
key = self._store.get_key(name=self.name, id=self.id)
self._store.update(key=key, value=self)

def is_added(self) -> ExpectationSuiteAddedDiagnostics:
return ExpectationSuiteAddedDiagnostics(
def is_fresh(self) -> ExpectationSuiteFreshnessDiagnostics:
diagnostics = self._is_added()
if not diagnostics.success:
return diagnostics
return self._is_fresh()

def _is_added(self) -> ExpectationSuiteFreshnessDiagnostics:
return ExpectationSuiteFreshnessDiagnostics(
errors=[] if self.id else [ExpectationSuiteNotAddedError(name=self.name)]
)

def _is_fresh(self) -> ExpectationSuiteFreshnessDiagnostics:
suite_dict: dict | None
try:
key = self._store.get_key(name=self.name, id=self.id)
suite_dict = self._store.get(key=key)
except StoreBackendError:
suite_dict = None
if not suite_dict:
return ExpectationSuiteFreshnessDiagnostics(
errors=[ExpectationSuiteNotFoundError(name=self.name)]
)

suite: ExpectationSuite | None
try:
suite = self._store.deserialize_suite_dict(suite_dict=suite_dict)
except PydanticValidationError:
suite = None
if not suite:
return ExpectationSuiteFreshnessDiagnostics(
errors=[ExpectationSuiteError(f"Could not deserialize suite '{self.name}'")]
)

return ExpectationSuiteFreshnessDiagnostics(
errors=[] if self == suite else [ExpectationSuiteNotFreshError(name=self.name)]
)

def _has_been_saved(self) -> bool:
"""Has this ExpectationSuite been persisted to a Store?"""
# todo: this should only check local keys instead of potentially querying the remote backend
Expand Down Expand Up @@ -599,7 +639,7 @@ def render(self) -> None:

def identifier_bundle(self) -> _IdentifierBundle:
# Utilized as a custom json_encoder
diagnostics = self.is_added()
diagnostics = self.is_fresh()
diagnostics.raise_for_error()

return _IdentifierBundle(name=self.name, id=self.id)
Expand Down
11 changes: 2 additions & 9 deletions great_expectations/core/factory/suite_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get(self, name: str) -> ExpectationSuite:
if not self._store.has_key(key=key):
raise DataContextError(f"ExpectationSuite with name {name} was not found.") # noqa: TRY003
suite_dict = self._store.get(key=key)
return self._deserialize(suite_dict)
return self._store.deserialize_suite_dict(suite_dict)

@public_api
@override
Expand All @@ -113,18 +113,11 @@ def all(self) -> Iterable[ExpectationSuite]:
bad_dicts: list[Any] = []
for suite_dict in dicts:
try:
deserializable_suites.append(self._deserialize(suite_dict))
deserializable_suites.append(self._store.deserialize_suite_dict(suite_dict))
except PydanticValidationError as e:
bad_dicts.append(suite_dict)
self._store.submit_all_deserialization_event(e)
except Exception as e:
self._store.submit_all_deserialization_event(e)
raise
return deserializable_suites

def _deserialize(self, suite_dict: dict) -> ExpectationSuite:
# TODO: Move this logic to the store
suite = ExpectationSuite(**suite_dict)
if self._include_rendered_content:
suite.render()
return suite
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from abc import abstractmethod
from dataclasses import dataclass
from typing import ClassVar, Tuple, Type

Expand All @@ -10,67 +9,67 @@
CheckpointNotAddedError,
CheckpointRelatedResourcesNotAddedError,
ExpectationSuiteNotAddedError,
ResourceNotAddedError,
GreatExpectationsError,
ResourcesNotAddedError,
ValidationDefinitionNotAddedError,
ValidationDefinitionRelatedResourcesNotAddedError,
)


@dataclass
class AddedDiagnostics:
class FreshnessDiagnostics:
"""
Wrapper around a list of errors; used to determine if a resource has been added successfully.
Wrapper around a list of errors; used to determine if a resource has been added successfully
and is "fresh" or up-to-date with its persisted equivalent.
Note that some resources may have dependencies on other resources - in order to be considered
"added", the root resource and all of its dependencies must be added successfully.
"fresh", the root resource and all of its dependencies must be "fresh".
For example, a Checkpoint may have dependencies on ValidationDefinitions, which may have
dependencies on ExpectationSuites and BatchDefinitions.
GX requires that all resources are added successfully before they can be used to prevent
GX requires that all resources are persisted successfully before they can be used to prevent
unexpected behavior.
"""

errors: list[ResourceNotAddedError]
raise_for_error_class: ClassVar[Type[ResourcesNotAddedError]] = ResourcesNotAddedError
errors: list[GreatExpectationsError]

@property
def is_added(self) -> bool:
def success(self) -> bool:
return len(self.errors) == 0

@abstractmethod
def raise_for_error(self) -> None:
"""
Conditionally raises an error if the resource has not been added successfully;
should prescribe the correct action(s) to take.
"""
raise NotImplementedError
if not self.success:
raise self.raise_for_error_class(errors=self.errors)


@dataclass
class _ChildAddedDiagnostics(AddedDiagnostics):
@override
def raise_for_error(self) -> None:
if not self.is_added:
raise self.errors[0] # Child node so only one error
class BatchDefinitionFreshnessDiagnostics(FreshnessDiagnostics):
pass


@dataclass
class BatchDefinitionAddedDiagnostics(_ChildAddedDiagnostics):
class ExpectationSuiteFreshnessDiagnostics(FreshnessDiagnostics):
pass


@dataclass
class ExpectationSuiteAddedDiagnostics(_ChildAddedDiagnostics):
pass
class _ParentFreshnessDiagnostics(FreshnessDiagnostics):
"""
Freshness diagnostics for a class that has a natural parent/child relationship
with other classes.
All errors throughout the hierarchy should be collected in the parent diagnostics object.
"""

@dataclass
class _ParentAddedDiagnostics(AddedDiagnostics):
parent_error_class: ClassVar[Type[ResourceNotAddedError]]
children_error_classes: ClassVar[Tuple[Type[ResourceNotAddedError], ...]]
raise_for_error_class: ClassVar[Type[ResourcesNotAddedError]]
parent_error_class: ClassVar[Type[GreatExpectationsError]]
children_error_classes: ClassVar[Tuple[Type[GreatExpectationsError], ...]]

def update_with_children(self, *children_diagnostics: AddedDiagnostics) -> None:
def update_with_children(self, *children_diagnostics: FreshnessDiagnostics) -> None:
for diagnostics in children_diagnostics:
# Child errors should be prepended to parent errors so diagnostics are in order
self.errors = diagnostics.errors + self.errors
Expand All @@ -85,14 +84,14 @@ def children_added(self) -> bool:

@override
def raise_for_error(self) -> None:
if not self.is_added:
if not self.success:
raise self.raise_for_error_class(errors=self.errors)


@dataclass
class ValidationDefinitionAddedDiagnostics(_ParentAddedDiagnostics):
parent_error_class: ClassVar[Type[ResourceNotAddedError]] = ValidationDefinitionNotAddedError
children_error_classes: ClassVar[Tuple[Type[ResourceNotAddedError], ...]] = (
class ValidationDefinitionFreshnessDiagnostics(_ParentFreshnessDiagnostics):
parent_error_class: ClassVar[Type[GreatExpectationsError]] = ValidationDefinitionNotAddedError
children_error_classes: ClassVar[Tuple[Type[GreatExpectationsError], ...]] = (
ExpectationSuiteNotAddedError,
BatchDefinitionNotAddedError,
)
Expand All @@ -102,9 +101,9 @@ class ValidationDefinitionAddedDiagnostics(_ParentAddedDiagnostics):


@dataclass
class CheckpointAddedDiagnostics(_ParentAddedDiagnostics):
parent_error_class: ClassVar[Type[ResourceNotAddedError]] = CheckpointNotAddedError
children_error_classes: ClassVar[Tuple[Type[ResourceNotAddedError], ...]] = (
class CheckpointFreshnessDiagnostics(_ParentFreshnessDiagnostics):
parent_error_class: ClassVar[Type[GreatExpectationsError]] = CheckpointNotAddedError
children_error_classes: ClassVar[Tuple[Type[GreatExpectationsError], ...]] = (
ValidationDefinitionNotAddedError,
)
raise_for_error_class: ClassVar[Type[ResourcesNotAddedError]] = (
Expand Down
Loading

0 comments on commit 7021e1a

Please sign in to comment.