-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Update dataframe batch.validate workflow #10165
Changes from 17 commits
e5f0d7c
b6b2cef
40db097
d19f2ed
560b6a7
6ff0243
6b5705e
b4b1291
bbef71f
210199e
745f2e4
64faf55
e52bb02
4d26889
20511e6
b54d253
a098801
dcc18fa
b5ce308
8458f77
f68eb0b
806613e
3af8eaa
acdd886
093e3da
81bfa72
04df188
92d252e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,13 +129,7 @@ | |
datasource_name=name, | ||
data_asset_name=asset.name, | ||
) | ||
cached_data_asset = self._in_memory_data_assets.get(in_memory_asset_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code block is removed because we no longer store the dataframe on the asset. |
||
if cached_data_asset: | ||
asset.dataframe = cached_data_asset.dataframe | ||
else: | ||
# Asset is loaded into cache here (even without df) to enable loading of df at a later # noqa: E501 | ||
# time when DataframeAsset.build_batch_request(dataframe=df) is called | ||
self._in_memory_data_assets[in_memory_asset_name] = asset | ||
self._in_memory_data_assets[in_memory_asset_name] = asset | ||
return ds | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -466,7 +466,9 @@ def ensure_batch_metadata_is_not_none(cls, value: Any) -> Union[dict, Any]: | |
return {} | ||
return value | ||
|
||
def _get_batch_metadata_from_batch_request(self, batch_request: BatchRequest) -> BatchMetadata: | ||
def _get_batch_metadata_from_batch_request( | ||
self, batch_request: BatchRequest, ignore_options: Sequence = () | ||
) -> BatchMetadata: | ||
"""Performs config variable substitution and populates batch parameters for | ||
Batch.metadata at runtime. | ||
""" | ||
|
@@ -477,7 +479,11 @@ def _get_batch_metadata_from_batch_request(self, batch_request: BatchRequest) -> | |
batch_metadata = _ConfigurationSubstitutor().substitute_all_config_variables( | ||
data=batch_metadata, replace_variables_dict=config_variables | ||
) | ||
batch_metadata.update(copy.deepcopy(batch_request.options)) | ||
batch_metadata.update( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was added since the dataframe now is always passed in in options and we don't want to copy that to the metadata (spark will actual die if we try). Previously, for runtime dataframes we forced |
||
copy.deepcopy( | ||
{k: v for k, v in batch_request.options.items() if k not in ignore_options} | ||
) | ||
) | ||
return batch_metadata | ||
|
||
# Sorter methods | ||
|
@@ -994,7 +1000,7 @@ def __init__( # noqa: PLR0913 | |
def _create_id(self) -> str: | ||
options_list = [] | ||
for key, value in self.batch_request.options.items(): | ||
if key != "path": | ||
if key not in ("path", "dataframe"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want the |
||
options_list.append(f"{key}_{value}") | ||
return "-".join([self.datasource.name, self.data_asset.name, *options_list]) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,16 +20,13 @@ | |
Set, | ||
Tuple, | ||
Type, | ||
TypeVar, | ||
Union, | ||
) | ||
|
||
import pandas as pd | ||
|
||
import great_expectations.exceptions as gx_exceptions | ||
from great_expectations._docs_decorators import ( | ||
deprecated_argument, | ||
new_argument, | ||
public_api, | ||
) | ||
from great_expectations.compatibility import pydantic, sqlalchemy | ||
|
@@ -52,6 +49,7 @@ | |
) | ||
from great_expectations.datasource.fluent.signatures import _merge_signatures | ||
from great_expectations.datasource.fluent.sources import DEFAULT_PANDAS_DATA_ASSET_NAME | ||
from great_expectations.exceptions.exceptions import BuildBatchRequestError | ||
|
||
_EXCLUDE_TYPES_FROM_JSON: list[Type] = [sqlite3.Connection] | ||
|
||
|
@@ -78,10 +76,6 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
# this enables us to include dataframe in the json schema | ||
_PandasDataFrameT = TypeVar("_PandasDataFrameT") | ||
|
||
|
||
class PandasDatasourceError(Exception): | ||
pass | ||
|
||
|
@@ -119,7 +113,9 @@ | |
def get_batch_parameters_keys( | ||
self, partitioner: Optional[ColumnPartitioner] = None | ||
) -> Tuple[str, ...]: | ||
return tuple() | ||
return tuple( | ||
"dataframe", | ||
) | ||
|
||
@override | ||
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: | ||
|
@@ -154,7 +150,7 @@ | |
) | ||
|
||
batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request( | ||
batch_request=batch_request | ||
batch_request=batch_request, ignore_options=("dataframe",) | ||
) | ||
|
||
batch_list.append( | ||
|
@@ -190,18 +186,21 @@ | |
get_batch_list_from_batch_request method. | ||
""" # noqa: E501 | ||
if options: | ||
raise ValueError( # noqa: TRY003 | ||
"options is not currently supported for this DataAssets and must be None or {}." | ||
raise BuildBatchRequestError( | ||
message="options is not currently supported for this DataAsset " | ||
"and must be None or {}." | ||
) | ||
|
||
if batch_slice is not None: | ||
raise ValueError( # noqa: TRY003 | ||
"batch_slice is not currently supported and must be None for this DataAsset." | ||
raise BuildBatchRequestError( | ||
message="batch_slice is not currently supported for this DataAsset " | ||
"and must be None." | ||
) | ||
|
||
if partitioner is not None: | ||
raise ValueError( # noqa: TRY003 | ||
"partitioner is not currently supported and must be None for this DataAsset." | ||
raise BuildBatchRequestError( | ||
message="partitioner is not currently supported for this DataAsset " | ||
"and must be None." | ||
) | ||
|
||
return BatchRequest( | ||
|
@@ -356,21 +355,13 @@ | |
return str(uuid.uuid4()).replace("-", "")[:11] | ||
|
||
|
||
class DataFrameAsset(_PandasDataAsset, Generic[_PandasDataFrameT]): | ||
class DataFrameAsset(_PandasDataAsset): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We remove the dataframe from the asset since it now needs to be passed in when making the batch definition. This let's us remove all the dataframe logic and this Generic from the asset. |
||
# instance attributes | ||
type: Literal["dataframe"] = "dataframe" | ||
# TODO: <Alex>05/31/2023: Upon removal of deprecated "dataframe" argument to "PandasDatasource.add_dataframe_asset()", default can be deleted.</Alex> # noqa: E501 | ||
dataframe: Optional[_PandasDataFrameT] = pydantic.Field(default=None, exclude=True, repr=False) | ||
|
||
class Config: | ||
extra = pydantic.Extra.forbid | ||
|
||
@pydantic.validator("dataframe") | ||
def _validate_dataframe(cls, dataframe: pd.DataFrame) -> pd.DataFrame: | ||
if not isinstance(dataframe, pd.DataFrame): | ||
raise ValueError("dataframe must be of type pandas.DataFrame") # noqa: TRY003, TRY004 | ||
return dataframe | ||
|
||
@override | ||
def _get_reader_method(self) -> str: | ||
raise NotImplementedError( | ||
|
@@ -382,64 +373,82 @@ | |
"""Pandas DataFrameAsset does not implement "_get_reader_options_include()" method, because DataFrame is already available.""" # noqa: E501 | ||
) | ||
|
||
# TODO: <Alex>05/31/2023: Upon removal of deprecated "dataframe" argument to "PandasDatasource.add_dataframe_asset()", its validation code must be deleted.</Alex> # noqa: E501 | ||
@new_argument( | ||
argument_name="dataframe", | ||
message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501 | ||
version="0.16.15", | ||
) | ||
@override | ||
def build_batch_request( # type: ignore[override] | ||
def build_batch_request( | ||
self, | ||
dataframe: Optional[pd.DataFrame] = None, | ||
options: Optional[BatchParameters] = None, | ||
batch_slice: Optional[BatchSlice] = None, | ||
partitioner: Optional[ColumnPartitioner] = None, | ||
) -> BatchRequest: | ||
"""A batch request that can be used to obtain batches for this DataAsset. | ||
|
||
Args: | ||
dataframe: The Pandas Dataframe containing the data for this DataFrame data asset. | ||
options: This is not currently supported and must be {}/None for this data asset. | ||
options: This should have 1 key, 'dataframe', whose value is the datafame to validate. | ||
batch_slice: This is not currently supported and must be None for this data asset. | ||
partitioner: This is not currently supported and must be None for this data asset. | ||
|
||
Returns: | ||
A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the | ||
get_batch_list_from_batch_request method. | ||
""" # noqa: E501 | ||
if options: | ||
raise ValueError( # noqa: TRY003 | ||
"options is not currently supported for this DataAssets and must be None or {}." | ||
) | ||
|
||
if batch_slice is not None: | ||
raise ValueError( # noqa: TRY003 | ||
"batch_slice is not currently supported and must be None for this DataAsset." | ||
raise BuildBatchRequestError( | ||
message="batch_slice is not currently supported for this DataAsset " | ||
"and must be None." | ||
) | ||
|
||
if partitioner is not None: | ||
raise ValueError( # noqa: TRY003 | ||
"partitioner is not currently supported and must be None for this DataAsset." | ||
raise BuildBatchRequestError( | ||
message="partitioner is not currently supported for this DataAsset" | ||
"and must be None." | ||
) | ||
|
||
if dataframe is None: | ||
df = self.dataframe | ||
else: | ||
df = dataframe # type: ignore[assignment] | ||
if not (options is not None and "dataframe" in options and len(options) == 1): | ||
raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.") | ||
|
||
if df is None: | ||
raise ValueError("Cannot build batch request for dataframe asset without a dataframe") # noqa: TRY003 | ||
if not isinstance(options["dataframe"], pd.DataFrame): | ||
raise BuildBatchRequestError( | ||
message="Cannot build batch request for dataframe asset without a dataframe" | ||
) | ||
|
||
self.dataframe = df | ||
return BatchRequest( | ||
datasource_name=self.datasource.name, | ||
data_asset_name=self.name, | ||
options=options, | ||
) | ||
|
||
@override | ||
def _validate_batch_request(self, batch_request: BatchRequest) -> None: | ||
"""Validates the batch_request has the correct form. | ||
|
||
return super().build_batch_request() | ||
Args: | ||
batch_request: A batch request object to be validated. | ||
""" | ||
if not ( | ||
batch_request.datasource_name == self.datasource.name | ||
and batch_request.data_asset_name == self.name | ||
and batch_request.options | ||
and len(batch_request.options) == 1 | ||
and "dataframe" in batch_request.options | ||
and isinstance(batch_request.options["dataframe"], pd.DataFrame) | ||
): | ||
expect_batch_request_form = BatchRequest[None]( | ||
datasource_name=self.datasource.name, | ||
data_asset_name=self.name, | ||
options={"dataframe": pd.DataFrame()}, | ||
batch_slice=batch_request._batch_slice_input, | ||
) | ||
raise gx_exceptions.InvalidBatchRequestError( # noqa: TRY003 | ||
"BatchRequest should have form:\n" | ||
f"{pf(expect_batch_request_form.dict())}\n" | ||
f"but actually has form:\n{pf(batch_request.dict())}\n" | ||
) | ||
|
||
@override | ||
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: | ||
self._validate_batch_request(batch_request) | ||
|
||
batch_spec = RuntimeDataBatchSpec(batch_data=self.dataframe) | ||
batch_spec = RuntimeDataBatchSpec(batch_data=batch_request.options["dataframe"]) | ||
execution_engine: PandasExecutionEngine = self.datasource.get_execution_engine() | ||
data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec) | ||
|
||
|
@@ -459,7 +468,7 @@ | |
) | ||
|
||
batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request( | ||
batch_request=batch_request | ||
batch_request=batch_request, ignore_options=("dataframe",) | ||
) | ||
|
||
return [ | ||
|
@@ -651,29 +660,22 @@ | |
'Cannot execute "PandasDatasource.read_dataframe()" without a valid "dataframe" argument.' # noqa: E501 | ||
) | ||
|
||
batch_request = asset.build_batch_request(dataframe=dataframe) | ||
batch_request = asset.build_batch_request(options={"dataframe": dataframe}) | ||
else: | ||
batch_request = asset.build_batch_request() | ||
|
||
return asset.get_batch_list_from_batch_request(batch_request)[-1] | ||
|
||
@public_api | ||
@deprecated_argument( | ||
argument_name="dataframe", | ||
message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501 | ||
version="0.16.15", | ||
) | ||
def add_dataframe_asset( | ||
self, | ||
name: str, | ||
dataframe: Optional[pd.DataFrame] = None, | ||
batch_metadata: Optional[BatchMetadata] = None, | ||
) -> DataFrameAsset: | ||
"""Adds a Dataframe DataAsset to this PandasDatasource object. | ||
|
||
Args: | ||
name: The name of the Dataframe asset. This can be any arbitrary string. | ||
dataframe: The Pandas Dataframe containing the data for this DataFrame data asset. | ||
batch_metadata: An arbitrary user defined dictionary with string keys which will get inherited by any | ||
batches created from the asset. | ||
|
||
|
@@ -684,7 +686,6 @@ | |
name=name, | ||
batch_metadata=batch_metadata or {}, | ||
) | ||
asset.dataframe = dataframe | ||
return self._add_asset(asset=asset) | ||
|
||
@public_api | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First impression here: does this interface put us in a place where things are less predictable? Should there just be a different method for building a batch request via dataframe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I like the unified method call perhaps we can add a specialized method based on need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have a better runtime story. This is a stopgap for the short term. I think currently
options
and other parameters exist and this throws if you pass them. This adds consistency. It also prevents you from adding dataframes onto the asset which had some weird consequences.But overall, I agree that this isn't ideal.