diff --git a/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py b/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py
index 1f60e2f2db5f..721d97021652 100644
--- a/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py
+++ b/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py
@@ -44,7 +44,7 @@
# Python
#
-my_batch_request = my_asset.build_batch_request(dataframe=dataframe)
+my_batch_request = my_asset.build_batch_request(options={"dataframe": dataframe})
#
# Python
diff --git a/docs/docusaurus/docs/snippets/result_format.py b/docs/docusaurus/docs/snippets/result_format.py
index 699a0364ec51..c35257580de1 100644
--- a/docs/docusaurus/docs/snippets/result_format.py
+++ b/docs/docusaurus/docs/snippets/result_format.py
@@ -19,7 +19,7 @@
context = gx.get_context()
datasource = context.data_sources.add_pandas(name="my_pandas_datasource")
data_asset = datasource.add_dataframe_asset(name="my_df")
-my_batch_request = data_asset.build_batch_request(dataframe=dataframe)
+my_batch_request = data_asset.build_batch_request(options={"dataframe": dataframe})
context.suites.add(ExpectationSuite(name="my_expectation_suite"))
my_validator = context.get_validator(
batch_request=my_batch_request,
diff --git a/great_expectations/datasource/datasource_dict.py b/great_expectations/datasource/datasource_dict.py
index 20ada813f606..21a86be69cb3 100644
--- a/great_expectations/datasource/datasource_dict.py
+++ b/great_expectations/datasource/datasource_dict.py
@@ -129,13 +129,7 @@ def _init_fluent_datasource(self, name: str, ds: FluentDatasource) -> FluentData
datasource_name=name,
data_asset_name=asset.name,
)
- cached_data_asset = self._in_memory_data_assets.get(in_memory_asset_name)
- if cached_data_asset:
- asset.dataframe = cached_data_asset.dataframe
- else:
- # Asset is loaded into cache here (even without df) to enable loading of df at a later # noqa: E501
- # time when DataframeAsset.build_batch_request(dataframe=df) is called
- self._in_memory_data_assets[in_memory_asset_name] = asset
+ self._in_memory_data_assets[in_memory_asset_name] = asset
return ds
diff --git a/great_expectations/datasource/fluent/interfaces.py b/great_expectations/datasource/fluent/interfaces.py
index b1e71ccdb543..15b48a86e519 100644
--- a/great_expectations/datasource/fluent/interfaces.py
+++ b/great_expectations/datasource/fluent/interfaces.py
@@ -466,7 +466,9 @@ def ensure_batch_metadata_is_not_none(cls, value: Any) -> Union[dict, Any]:
return {}
return value
- def _get_batch_metadata_from_batch_request(self, batch_request: BatchRequest) -> BatchMetadata:
+ def _get_batch_metadata_from_batch_request(
+ self, batch_request: BatchRequest, ignore_options: Sequence = ()
+ ) -> BatchMetadata:
"""Performs config variable substitution and populates batch parameters for
Batch.metadata at runtime.
"""
@@ -477,7 +479,11 @@ def _get_batch_metadata_from_batch_request(self, batch_request: BatchRequest) ->
batch_metadata = _ConfigurationSubstitutor().substitute_all_config_variables(
data=batch_metadata, replace_variables_dict=config_variables
)
- batch_metadata.update(copy.deepcopy(batch_request.options))
+ batch_metadata.update(
+ copy.deepcopy(
+ {k: v for k, v in batch_request.options.items() if k not in ignore_options}
+ )
+ )
return batch_metadata
# Sorter methods
@@ -994,7 +1000,7 @@ def __init__( # noqa: PLR0913
def _create_id(self) -> str:
options_list = []
for key, value in self.batch_request.options.items():
- if key != "path":
+ if key not in ("path", "dataframe"):
options_list.append(f"{key}_{value}")
return "-".join([self.datasource.name, self.data_asset.name, *options_list])
diff --git a/great_expectations/datasource/fluent/pandas_datasource.py b/great_expectations/datasource/fluent/pandas_datasource.py
index 936f571b4f39..c4cbc6c12d99 100644
--- a/great_expectations/datasource/fluent/pandas_datasource.py
+++ b/great_expectations/datasource/fluent/pandas_datasource.py
@@ -20,7 +20,6 @@
Set,
Tuple,
Type,
- TypeVar,
Union,
)
@@ -28,8 +27,6 @@
import great_expectations.exceptions as gx_exceptions
from great_expectations._docs_decorators import (
- deprecated_argument,
- new_argument,
public_api,
)
from great_expectations.compatibility import pydantic, sqlalchemy
@@ -52,6 +49,7 @@
)
from great_expectations.datasource.fluent.signatures import _merge_signatures
from great_expectations.datasource.fluent.sources import DEFAULT_PANDAS_DATA_ASSET_NAME
+from great_expectations.exceptions.exceptions import BuildBatchRequestError
_EXCLUDE_TYPES_FROM_JSON: list[Type] = [sqlite3.Connection]
@@ -78,10 +76,6 @@
logger = logging.getLogger(__name__)
-# this enables us to include dataframe in the json schema
-_PandasDataFrameT = TypeVar("_PandasDataFrameT")
-
-
class PandasDatasourceError(Exception):
pass
@@ -119,7 +113,9 @@ def test_connection(self) -> None: ...
def get_batch_parameters_keys(
self, partitioner: Optional[ColumnPartitioner] = None
) -> Tuple[str, ...]:
- return tuple()
+ return tuple(
+ "dataframe",
+ )
@override
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]:
@@ -154,7 +150,7 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list
)
batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request(
- batch_request=batch_request
+ batch_request=batch_request, ignore_options=("dataframe",)
)
batch_list.append(
@@ -190,18 +186,21 @@ def build_batch_request(
get_batch_list_from_batch_request method.
""" # noqa: E501
if options:
- raise ValueError( # noqa: TRY003
- "options is not currently supported for this DataAssets and must be None or {}."
+ raise BuildBatchRequestError(
+ message="options is not currently supported for this DataAsset "
+ "and must be None or {}."
)
if batch_slice is not None:
- raise ValueError( # noqa: TRY003
- "batch_slice is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="batch_slice is not currently supported for this DataAsset "
+ "and must be None."
)
if partitioner is not None:
- raise ValueError( # noqa: TRY003
- "partitioner is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="partitioner is not currently supported for this DataAsset "
+ "and must be None."
)
return BatchRequest(
@@ -356,21 +355,13 @@ def _short_id() -> str:
return str(uuid.uuid4()).replace("-", "")[:11]
-class DataFrameAsset(_PandasDataAsset, Generic[_PandasDataFrameT]):
+class DataFrameAsset(_PandasDataAsset):
# instance attributes
type: Literal["dataframe"] = "dataframe"
- # TODO: 05/31/2023: Upon removal of deprecated "dataframe" argument to "PandasDatasource.add_dataframe_asset()", default can be deleted. # noqa: E501
- dataframe: Optional[_PandasDataFrameT] = pydantic.Field(default=None, exclude=True, repr=False)
class Config:
extra = pydantic.Extra.forbid
- @pydantic.validator("dataframe")
- def _validate_dataframe(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
- if not isinstance(dataframe, pd.DataFrame):
- raise ValueError("dataframe must be of type pandas.DataFrame") # noqa: TRY003, TRY004
- return dataframe
-
@override
def _get_reader_method(self) -> str:
raise NotImplementedError(
@@ -382,16 +373,9 @@ def _get_reader_options_include(self) -> set[str]:
"""Pandas DataFrameAsset does not implement "_get_reader_options_include()" method, because DataFrame is already available.""" # noqa: E501
)
- # TODO: 05/31/2023: Upon removal of deprecated "dataframe" argument to "PandasDatasource.add_dataframe_asset()", its validation code must be deleted. # noqa: E501
- @new_argument(
- argument_name="dataframe",
- message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501
- version="0.16.15",
- )
@override
- def build_batch_request( # type: ignore[override]
+ def build_batch_request(
self,
- dataframe: Optional[pd.DataFrame] = None,
options: Optional[BatchParameters] = None,
batch_slice: Optional[BatchSlice] = None,
partitioner: Optional[ColumnPartitioner] = None,
@@ -399,8 +383,7 @@ def build_batch_request( # type: ignore[override]
"""A batch request that can be used to obtain batches for this DataAsset.
Args:
- dataframe: The Pandas Dataframe containing the data for this DataFrame data asset.
- options: This is not currently supported and must be {}/None for this data asset.
+ options: This should have 1 key, 'dataframe', whose value is the datafame to validate.
batch_slice: This is not currently supported and must be None for this data asset.
partitioner: This is not currently supported and must be None for this data asset.
@@ -408,38 +391,64 @@ def build_batch_request( # type: ignore[override]
A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the
get_batch_list_from_batch_request method.
""" # noqa: E501
- if options:
- raise ValueError( # noqa: TRY003
- "options is not currently supported for this DataAssets and must be None or {}."
- )
-
if batch_slice is not None:
- raise ValueError( # noqa: TRY003
- "batch_slice is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="batch_slice is not currently supported for this DataAsset "
+ "and must be None."
)
if partitioner is not None:
- raise ValueError( # noqa: TRY003
- "partitioner is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="partitioner is not currently supported for this DataAsset"
+ "and must be None."
)
- if dataframe is None:
- df = self.dataframe
- else:
- df = dataframe # type: ignore[assignment]
+ if not (options is not None and "dataframe" in options and len(options) == 1):
+ raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.")
- if df is None:
- raise ValueError("Cannot build batch request for dataframe asset without a dataframe") # noqa: TRY003
+ if not isinstance(options["dataframe"], pd.DataFrame):
+ raise BuildBatchRequestError(
+ message="Cannot build batch request for dataframe asset without a dataframe"
+ )
- self.dataframe = df
+ return BatchRequest(
+ datasource_name=self.datasource.name,
+ data_asset_name=self.name,
+ options=options,
+ )
+
+ @override
+ def _validate_batch_request(self, batch_request: BatchRequest) -> None:
+ """Validates the batch_request has the correct form.
- return super().build_batch_request()
+ Args:
+ batch_request: A batch request object to be validated.
+ """
+ if not (
+ batch_request.datasource_name == self.datasource.name
+ and batch_request.data_asset_name == self.name
+ and batch_request.options
+ and len(batch_request.options) == 1
+ and "dataframe" in batch_request.options
+ and isinstance(batch_request.options["dataframe"], pd.DataFrame)
+ ):
+ expect_batch_request_form = BatchRequest[None](
+ datasource_name=self.datasource.name,
+ data_asset_name=self.name,
+ options={"dataframe": pd.DataFrame()},
+ batch_slice=batch_request._batch_slice_input,
+ )
+ raise gx_exceptions.InvalidBatchRequestError( # noqa: TRY003
+ "BatchRequest should have form:\n"
+ f"{pf(expect_batch_request_form.dict())}\n"
+ f"but actually has form:\n{pf(batch_request.dict())}\n"
+ )
@override
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]:
self._validate_batch_request(batch_request)
- batch_spec = RuntimeDataBatchSpec(batch_data=self.dataframe)
+ batch_spec = RuntimeDataBatchSpec(batch_data=batch_request.options["dataframe"])
execution_engine: PandasExecutionEngine = self.datasource.get_execution_engine()
data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec)
@@ -459,7 +468,7 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list
)
batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request(
- batch_request=batch_request
+ batch_request=batch_request, ignore_options=("dataframe",)
)
return [
@@ -651,29 +660,22 @@ def _get_batch(self, asset: _PandasDataAsset, dataframe: pd.DataFrame | None = N
'Cannot execute "PandasDatasource.read_dataframe()" without a valid "dataframe" argument.' # noqa: E501
)
- batch_request = asset.build_batch_request(dataframe=dataframe)
+ batch_request = asset.build_batch_request(options={"dataframe": dataframe})
else:
batch_request = asset.build_batch_request()
return asset.get_batch_list_from_batch_request(batch_request)[-1]
@public_api
- @deprecated_argument(
- argument_name="dataframe",
- message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501
- version="0.16.15",
- )
def add_dataframe_asset(
self,
name: str,
- dataframe: Optional[pd.DataFrame] = None,
batch_metadata: Optional[BatchMetadata] = None,
) -> DataFrameAsset:
"""Adds a Dataframe DataAsset to this PandasDatasource object.
Args:
name: The name of the Dataframe asset. This can be any arbitrary string.
- dataframe: The Pandas Dataframe containing the data for this DataFrame data asset.
batch_metadata: An arbitrary user defined dictionary with string keys which will get inherited by any
batches created from the asset.
@@ -684,7 +686,6 @@ def add_dataframe_asset(
name=name,
batch_metadata=batch_metadata or {},
)
- asset.dataframe = dataframe
return self._add_asset(asset=asset)
@public_api
diff --git a/great_expectations/datasource/fluent/pandas_datasource.pyi b/great_expectations/datasource/fluent/pandas_datasource.pyi
index 37b44341529f..1bcd34c39c1e 100644
--- a/great_expectations/datasource/fluent/pandas_datasource.pyi
+++ b/great_expectations/datasource/fluent/pandas_datasource.pyi
@@ -27,7 +27,6 @@ from typing_extensions import TypeAlias
from great_expectations._docs_decorators import (
deprecated_argument,
- new_argument,
)
from great_expectations._docs_decorators import (
public_api as public_api,
@@ -60,7 +59,6 @@ _EXCLUDE_TYPES_FROM_JSON: list[Type]
MappingIntStrAny: TypeAlias = Mapping[Union[int, str], Any]
AbstractSetIntStr: TypeAlias = AbstractSet[Union[int, str]]
logger: Logger
-_PandasDataFrameT = TypeVar("_PandasDataFrameT")
class PandasDatasourceError(Exception): ...
@@ -122,16 +120,13 @@ class XMLAsset(_PandasDataAsset): ...
class DataFrameAsset(_PandasDataAsset):
type: Literal["dataframe"]
- dataframe: _PandasDataFrameT # type: ignore[valid-type]
- @new_argument(
- argument_name="dataframe",
- message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501
- version="0.16.15",
- )
@override
- def build_batch_request( # type: ignore[override]
- self, dataframe: Optional[pd.DataFrame] = None
+ def build_batch_request(
+ self,
+ options: Optional[BatchParameters] = ...,
+ batch_slice: Optional[BatchSlice] = ...,
+ partitioner: Optional[ColumnPartitioner] = ...,
) -> BatchRequest: ...
@override
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: ...
diff --git a/great_expectations/datasource/fluent/schemas/PandasDatasource/DataFrameAsset.json b/great_expectations/datasource/fluent/schemas/PandasDatasource/DataFrameAsset.json
index 434be5ce5471..6406014bcfff 100644
--- a/great_expectations/datasource/fluent/schemas/PandasDatasource/DataFrameAsset.json
+++ b/great_expectations/datasource/fluent/schemas/PandasDatasource/DataFrameAsset.json
@@ -38,9 +38,6 @@
"items": {
"$ref": "#/definitions/BatchDefinition"
}
- },
- "dataframe": {
- "title": "Dataframe"
}
},
"required": [
diff --git a/great_expectations/datasource/fluent/schemas/SparkDatasource.json b/great_expectations/datasource/fluent/schemas/SparkDatasource.json
index 0fd0522bb868..dba8d5a861fe 100644
--- a/great_expectations/datasource/fluent/schemas/SparkDatasource.json
+++ b/great_expectations/datasource/fluent/schemas/SparkDatasource.json
@@ -548,9 +548,6 @@
"items": {
"$ref": "#/definitions/BatchDefinition"
}
- },
- "dataframe": {
- "title": "Dataframe"
}
},
"required": [
diff --git a/great_expectations/datasource/fluent/schemas/SparkDatasource/DataFrameAsset.json b/great_expectations/datasource/fluent/schemas/SparkDatasource/DataFrameAsset.json
index 434be5ce5471..6406014bcfff 100644
--- a/great_expectations/datasource/fluent/schemas/SparkDatasource/DataFrameAsset.json
+++ b/great_expectations/datasource/fluent/schemas/SparkDatasource/DataFrameAsset.json
@@ -38,9 +38,6 @@
"items": {
"$ref": "#/definitions/BatchDefinition"
}
- },
- "dataframe": {
- "title": "Dataframe"
}
},
"required": [
diff --git a/great_expectations/datasource/fluent/spark_datasource.py b/great_expectations/datasource/fluent/spark_datasource.py
index 7d70e3e517d5..756e4e9f877b 100644
--- a/great_expectations/datasource/fluent/spark_datasource.py
+++ b/great_expectations/datasource/fluent/spark_datasource.py
@@ -18,8 +18,6 @@
import great_expectations.exceptions as gx_exceptions
from great_expectations._docs_decorators import (
- deprecated_argument,
- new_argument,
public_api,
)
from great_expectations.compatibility import pydantic
@@ -43,11 +41,13 @@
TestConnectionError,
_DataAssetT,
)
+from great_expectations.exceptions.exceptions import BuildBatchRequestError
if TYPE_CHECKING:
from typing_extensions import TypeAlias
from great_expectations.compatibility.pyspark import SparkSession
+ from great_expectations.core.batch_definition import BatchDefinition
from great_expectations.core.partitioners import ColumnPartitioner
from great_expectations.datasource.fluent.data_connector.batch_filter import BatchSlice
from great_expectations.datasource.fluent.interfaces import BatchMetadata
@@ -169,18 +169,10 @@ def test_connection(self, test_assets: bool = True) -> None:
class DataFrameAsset(DataAsset, Generic[_SparkDataFrameT]):
# instance attributes
type: Literal["dataframe"] = "dataframe"
- dataframe: Optional[_SparkDataFrameT] = pydantic.Field(default=None, exclude=True, repr=False)
class Config:
extra = pydantic.Extra.forbid
- @pydantic.validator("dataframe")
- def _validate_dataframe(cls, dataframe: DataFrame) -> DataFrame:
- if not (DataFrame and isinstance(dataframe, DataFrame)): # type: ignore[truthy-function]
- raise ValueError("dataframe must be of type pyspark.sql.DataFrame") # noqa: TRY003
-
- return dataframe
-
@override
def test_connection(self) -> None: ...
@@ -188,7 +180,9 @@ def test_connection(self) -> None: ...
def get_batch_parameters_keys(
self, partitioner: Optional[ColumnPartitioner] = None
) -> tuple[str, ...]:
- return tuple()
+ return tuple(
+ "dataframe",
+ )
def _get_reader_method(self) -> str:
raise NotImplementedError(
@@ -200,15 +194,9 @@ def _get_reader_options_include(self) -> set[str]:
"""Spark DataFrameAsset does not implement "_get_reader_options_include()" method, because DataFrame is already available.""" # noqa: E501
)
- @new_argument(
- argument_name="dataframe",
- message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501
- version="0.16.15",
- )
@override
- def build_batch_request( # type: ignore[override]
+ def build_batch_request(
self,
- dataframe: Optional[_SparkDataFrameT] = None,
options: Optional[BatchParameters] = None,
batch_slice: Optional[BatchSlice] = None,
partitioner: Optional[ColumnPartitioner] = None,
@@ -216,8 +204,7 @@ def build_batch_request( # type: ignore[override]
"""A batch request that can be used to obtain batches for this DataAsset.
Args:
- dataframe: The Spark Dataframe containing the data for this DataFrame data asset.
- options: This is not currently supported and must be {}/None for this data asset.
+ options: This should have 1 key, 'dataframe', whose value is the datafame to validate.
batch_slice: This is not currently supported and must be None for this data asset.
partitioner: This is not currently supported and must be None for this data asset.
@@ -226,35 +213,30 @@ def build_batch_request( # type: ignore[override]
A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the
get_batch_list_from_batch_request method.
""" # noqa: E501
- if options:
- raise ValueError( # noqa: TRY003
- "options is not currently supported for this DataAssets and must be None or {}."
- )
-
if batch_slice is not None:
- raise ValueError( # noqa: TRY003
- "batch_slice is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="batch_slice is not currently supported for this DataAsset "
+ "and must be None."
)
if partitioner is not None:
- raise ValueError( # noqa: TRY003
- "partitioner is not currently supported and must be None for this DataAsset."
+ raise BuildBatchRequestError(
+ message="partitioner is not currently supported for this DataAsset "
+ "and must be None."
)
- if dataframe is None:
- df = self.dataframe
- else:
- df = dataframe
+ if not (options is not None and "dataframe" in options and len(options) == 1):
+ raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.")
- if df is None:
- raise ValueError("Cannot build batch request for dataframe asset without a dataframe") # noqa: TRY003
-
- self.dataframe = df
+ if not isinstance(options["dataframe"], DataFrame):
+ raise BuildBatchRequestError(
+ message="Can not build batch request for dataframe asset " "without a dataframe."
+ )
return BatchRequest(
datasource_name=self.datasource.name,
data_asset_name=self.name,
- options={},
+ options=options,
)
@override
@@ -267,7 +249,10 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None:
if not (
batch_request.datasource_name == self.datasource.name
and batch_request.data_asset_name == self.name
- and not batch_request.options
+ and batch_request.options
+ and len(batch_request.options) == 1
+ and "dataframe" in batch_request.options
+ and isinstance(batch_request.options["dataframe"], DataFrame)
):
expect_batch_request_form = BatchRequest[None](
datasource_name=self.datasource.name,
@@ -285,7 +270,7 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None:
def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]:
self._validate_batch_request(batch_request)
- batch_spec = RuntimeDataBatchSpec(batch_data=self.dataframe)
+ batch_spec = RuntimeDataBatchSpec(batch_data=batch_request.options["dataframe"])
execution_engine: SparkDFExecutionEngine = self.datasource.get_execution_engine()
data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec)
@@ -305,7 +290,7 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list
)
batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request(
- batch_request=batch_request
+ batch_request=batch_request, ignore_options=("dataframe",)
)
return [
@@ -321,6 +306,13 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list
)
]
+ @public_api
+ def add_batch_definition_whole_dataframe(self, name: str) -> BatchDefinition:
+ return self.add_batch_definition(
+ name=name,
+ partitioner=None,
+ )
+
@public_api
class SparkDatasource(_SparkDatasource):
@@ -333,15 +325,9 @@ class SparkDatasource(_SparkDatasource):
assets: List[DataFrameAsset] = []
@public_api
- @deprecated_argument(
- argument_name="dataframe",
- message='The "dataframe" argument is no longer part of "PandasDatasource.add_dataframe_asset()" method call; instead, "dataframe" is the required argument to "DataFrameAsset.build_batch_request()" method.', # noqa: E501
- version="0.16.15",
- )
def add_dataframe_asset(
self,
name: str,
- dataframe: Optional[_SparkDataFrameT] = None,
batch_metadata: Optional[BatchMetadata] = None,
) -> DataFrameAsset:
"""Adds a Dataframe DataAsset to this SparkDatasource object.
@@ -359,5 +345,4 @@ def add_dataframe_asset(
name=name,
batch_metadata=batch_metadata or {},
)
- asset.dataframe = dataframe
return self._add_asset(asset=asset)
diff --git a/great_expectations/exceptions/exceptions.py b/great_expectations/exceptions/exceptions.py
index fed9122031e2..08eb1cbb5971 100644
--- a/great_expectations/exceptions/exceptions.py
+++ b/great_expectations/exceptions/exceptions.py
@@ -140,6 +140,11 @@ class InvalidBatchRequestError(GreatExpectationsError):
pass
+class BuildBatchRequestError(GreatExpectationsError):
+ def __init__(self, message: str):
+ super().__init__(f"Bad input to build_batch_request: {message}")
+
+
class InvalidBatchIdError(GreatExpectationsError):
pass
diff --git a/tests/checkpoint/conftest.py b/tests/checkpoint/conftest.py
index 626b80f98f61..d3ad1cfc4ce6 100644
--- a/tests/checkpoint/conftest.py
+++ b/tests/checkpoint/conftest.py
@@ -113,7 +113,7 @@ def titanic_data_context_with_fluent_pandas_datasources_stats_enabled_and_expect
test_df: pd.DataFrame = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
asset = datasource.add_dataframe_asset(name="my_other_dataframe_asset")
- _ = asset.build_batch_request(dataframe=test_df)
+ _ = asset.build_batch_request(options={"dataframe": test_df})
# create expectation suite
suite = context.suites.add(ExpectationSuite("my_expectation_suite"))
diff --git a/tests/conftest.py b/tests/conftest.py
index 636755891e28..d5b353f5fe63 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -943,7 +943,7 @@ def titanic_data_context_with_fluent_pandas_datasources_with_checkpoints_v1_with
dataframe_asset_name = "my_dataframe_asset"
asset = datasource.add_dataframe_asset(name=dataframe_asset_name)
- _ = asset.build_batch_request(dataframe=df)
+ _ = asset.build_batch_request(options={"dataframe": df})
# noinspection PyProtectedMember
context._save_project_config()
@@ -998,7 +998,7 @@ def titanic_data_context_with_fluent_pandas_and_spark_datasources_with_checkpoin
dataframe_asset_name = "my_dataframe_asset"
asset = datasource.add_dataframe_asset(name=dataframe_asset_name)
- _ = asset.build_batch_request(dataframe=spark_df)
+ _ = asset.build_batch_request(options={"dataframe": spark_df})
# noinspection PyProtectedMember
context._save_project_config()
diff --git a/tests/data_context/cloud_data_context/test_include_rendered_content.py b/tests/data_context/cloud_data_context/test_include_rendered_content.py
index b74cbf775e4f..89a8ee122f2f 100644
--- a/tests/data_context/cloud_data_context/test_include_rendered_content.py
+++ b/tests/data_context/cloud_data_context/test_include_rendered_content.py
@@ -29,10 +29,9 @@ def test_cloud_backed_data_context_expectation_validation_result_include_rendere
data_asset = context.data_sources.pandas_default.add_dataframe_asset(
name="my_dataframe_asset",
- dataframe=df,
)
validator: Validator = context.get_validator(
- batch_request=data_asset.build_batch_request(),
+ batch_request=data_asset.build_batch_request(options={"dataframe": df}),
create_expectation_suite_with_name=suite_name,
)
diff --git a/tests/datasource/fluent/integration/test_integration_datasource.py b/tests/datasource/fluent/integration/test_integration_datasource.py
index 3b40a44cf77e..9e21b9fcf8a1 100644
--- a/tests/datasource/fluent/integration/test_integration_datasource.py
+++ b/tests/datasource/fluent/integration/test_integration_datasource.py
@@ -1,7 +1,8 @@
from __future__ import annotations
import datetime
-from typing import TYPE_CHECKING
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Callable
from unittest import mock
import pandas as pd
@@ -24,6 +25,7 @@
from great_expectations.data_context import (
AbstractDataContext,
CloudDataContext,
+ EphemeralDataContext,
FileDataContext,
)
from great_expectations.datasource.fluent import (
@@ -33,6 +35,8 @@
DataAsset,
Datasource,
)
+from great_expectations.execution_engine.pandas_batch_data import PandasBatchData
+from great_expectations.execution_engine.sparkdf_batch_data import SparkDFBatchData
from great_expectations.validator.v1_validator import Validator
from tests.datasource.fluent.integration.conftest import sqlite_datasource
from tests.datasource.fluent.integration.integration_test_utils import (
@@ -43,12 +47,16 @@
if TYPE_CHECKING:
from responses import RequestsMock
+ from great_expectations.compatibility.pyspark import DataFrame as SparkDataFrame
+ from great_expectations.compatibility.pyspark import SparkSession
from great_expectations.datasource.fluent.pandas_datasource import (
DataFrameAsset as PandasDataFrameAsset,
)
+ from great_expectations.datasource.fluent.pandas_datasource import PandasDatasource
from great_expectations.datasource.fluent.spark_datasource import (
DataFrameAsset as SparkDataFrameAsset,
)
+ from great_expectations.datasource.fluent.spark_datasource import SparkDatasource
# This is marked by the various backend used in testing in the datasource_test_data fixture.
@@ -469,8 +477,10 @@ def test_pandas_data_adding_dataframe_in_cloud_context(
dataframe_asset: PandasDataFrameAsset = context.data_sources.add_or_update_pandas(
name="fluent_pandas_datasource"
).add_dataframe_asset(name="my_df_asset")
- _ = dataframe_asset.build_batch_request(dataframe=df)
- assert dataframe_asset.dataframe.equals(df) # type: ignore[attr-defined] # _PandasDataFrameT
+ batch_def = dataframe_asset.add_batch_definition_whole_dataframe(name="bd")
+ batch = batch_def.get_batch(batch_parameters={"dataframe": df})
+ assert isinstance(batch.data, PandasBatchData)
+ assert batch.data.dataframe.equals(df)
@pytest.mark.filesystem
@@ -483,15 +493,20 @@ def test_pandas_data_adding_dataframe_in_file_reloaded_context(
datasource = context.data_sources.add_or_update_pandas(name="fluent_pandas_datasource")
dataframe_asset: PandasDataFrameAsset = datasource.add_dataframe_asset(name="my_df_asset")
- _ = dataframe_asset.build_batch_request(dataframe=df)
- assert dataframe_asset.dataframe.equals(df) # type: ignore[attr-defined] # _PandasDataFrameT
+ batch_def = dataframe_asset.add_batch_definition_whole_dataframe(name="bd")
+ batch = batch_def.get_batch(batch_parameters={"dataframe": df})
+ assert isinstance(batch.data, PandasBatchData)
+ assert batch.data.dataframe.equals(df)
+ # Reload the asset and see that we can re-add the df to the batch definition
context = gx.get_context(context_root_dir=context.root_directory, cloud_mode=False)
dataframe_asset = context.get_datasource(datasource_name="fluent_pandas_datasource").get_asset(
asset_name="my_df_asset"
)
- _ = dataframe_asset.build_batch_request(dataframe=df)
- assert dataframe_asset.dataframe.equals(df) # type: ignore[attr-defined] # _PandasDataFrameT
+ reloaded_batch_def = dataframe_asset.get_batch_definition(batch_definition_name="bd")
+ batch = reloaded_batch_def.get_batch(batch_parameters={"dataframe": df})
+ assert isinstance(batch.data, PandasBatchData)
+ assert batch.data.dataframe.equals(df)
@pytest.mark.spark
@@ -507,10 +522,12 @@ def test_spark_data_adding_dataframe_in_cloud_context(
context = empty_cloud_context_fluent
dataframe_asset: SparkDataFrameAsset = context.data_sources.add_or_update_spark(
- name="fluent_pandas_datasource"
+ name="fluent_spark_datasource"
).add_dataframe_asset(name="my_df_asset")
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
- assert dataframe_asset.dataframe.toPandas().equals(df) # type: ignore[union-attr]
+ batch_def = dataframe_asset.add_batch_definition_whole_dataframe(name="bd")
+ batch = batch_def.get_batch(batch_parameters={"dataframe": spark_df})
+ assert isinstance(batch.data, SparkDFBatchData)
+ assert batch.data.dataframe.toPandas().equals(df)
@pytest.mark.spark
@@ -525,19 +542,124 @@ def test_spark_data_adding_dataframe_in_file_reloaded_context(
context = empty_file_context
dataframe_asset: SparkDataFrameAsset = context.data_sources.add_or_update_spark(
- name="fluent_pandas_datasource"
+ name="fluent_spark_datasource"
).add_dataframe_asset(name="my_df_asset")
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
- assert dataframe_asset.dataframe.toPandas().equals(df) # type: ignore[union-attr]
-
- datasource = context.data_sources.add_or_update_spark(name="fluent_pandas_datasource")
- dataframe_asset = datasource.add_dataframe_asset(name="my_df_asset")
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
- assert dataframe_asset.dataframe.toPandas().equals(df) # type: ignore[union-attr]
+ batch_def = dataframe_asset.add_batch_definition_whole_dataframe(name="bd")
+ batch = batch_def.get_batch(batch_parameters={"dataframe": spark_df})
+ assert isinstance(batch.data, SparkDFBatchData)
+ assert batch.data.dataframe.toPandas().equals(df)
context = gx.get_context(context_root_dir=context.root_directory, cloud_mode=False)
- dataframe_asset = context.get_datasource(datasource_name="fluent_pandas_datasource").get_asset(
- asset_name="my_df_asset"
+ retrieved_bd = (
+ context.get_datasource(datasource_name="fluent_spark_datasource")
+ .get_asset(asset_name="my_df_asset")
+ .get_batch_definition(batch_definition_name="bd")
+ )
+ new_batch = retrieved_bd.get_batch(batch_parameters={"dataframe": spark_df})
+ assert isinstance(new_batch.data, SparkDFBatchData)
+ assert new_batch.data.dataframe.toPandas().equals(df)
+
+
+@dataclass
+class PandasDataSourceAndFrame:
+ datasource: PandasDatasource
+ dataframe: pd.DataFrame
+
+
+@dataclass
+class SparkDataSourceAndFrame:
+ datasource: SparkDatasource
+ dataframe: SparkDataFrame
+
+
+def _validate_whole_dataframe_batch(
+ source_and_frame: PandasDataSourceAndFrame | SparkDataSourceAndFrame,
+):
+ my_expectation = gxe.ExpectColumnMeanToBeBetween(
+ column="column_name", min_value=2.5, max_value=3.5
+ )
+ asset = source_and_frame.datasource.add_dataframe_asset(name="asset")
+ bd = asset.add_batch_definition_whole_dataframe(name="bd")
+ batch = bd.get_batch(batch_parameters={"dataframe": source_and_frame.dataframe})
+ result = batch.validate(my_expectation)
+ assert result.success
+
+
+@pytest.mark.unit
+def test_validate_pandas_batch():
+ context = gx.get_context(mode="ephemeral")
+ datasource = context.data_sources.add_pandas(name="ds")
+ df = pd.DataFrame({"column_name": [1, 2, 3, 4, 5]})
+ _validate_whole_dataframe_batch(PandasDataSourceAndFrame(datasource=datasource, dataframe=df))
+
+
+@pytest.mark.spark
+def test_validate_spark_batch(
+ spark_session: SparkSession,
+ spark_df_from_pandas_df: Callable[[SparkSession, pd.DataFrame], SparkDataFrame],
+):
+ context = gx.get_context(mode="ephemeral")
+ datasource = context.data_sources.add_spark(name="ds")
+ spark_df = spark_df_from_pandas_df(
+ spark_session, pd.DataFrame({"column_name": [1, 2, 3, 4, 5]})
+ )
+ _validate_whole_dataframe_batch(
+ SparkDataSourceAndFrame(datasource=datasource, dataframe=spark_df)
+ )
+
+
+@dataclass
+class ContextPandasDataSourceAndFrame:
+ context: EphemeralDataContext
+ datasource: PandasDatasource
+ dataframe: pd.DataFrame
+
+
+@dataclass
+class ContextSparkDataSourceAndFrame:
+ context: EphemeralDataContext
+ datasource: SparkDatasource
+ dataframe: SparkDataFrame
+
+
+def _validate_whole_dataframe_batch_definition(
+ context_source_frame: ContextPandasDataSourceAndFrame | ContextSparkDataSourceAndFrame,
+):
+ asset = context_source_frame.datasource.add_dataframe_asset(name="asset")
+ bd = asset.add_batch_definition_whole_dataframe(name="bd")
+ suite = context_source_frame.context.suites.add(gx.ExpectationSuite(name="suite"))
+ suite.add_expectation(
+ gxe.ExpectColumnMeanToBeBetween(column="column_name", min_value=2.5, max_value=3.5)
+ )
+ validation_def = gx.ValidationDefinition(
+ name="vd",
+ data=bd,
+ suite=suite,
+ )
+ result = validation_def.run(batch_parameters={"dataframe": context_source_frame.dataframe})
+ assert result.success
+
+
+@pytest.mark.unit
+def test_validate_pandas_batch_definition():
+ context = gx.get_context(mode="ephemeral")
+ datasource = context.data_sources.add_pandas(name="ds")
+ df = pd.DataFrame({"column_name": [1, 2, 3, 4, 5]})
+ _validate_whole_dataframe_batch_definition(
+ ContextPandasDataSourceAndFrame(context=context, datasource=datasource, dataframe=df)
+ )
+
+
+@pytest.mark.spark
+def test_validate_spark_batch_definition(
+ spark_session: SparkSession,
+ spark_df_from_pandas_df: Callable[[SparkSession, pd.DataFrame], SparkDataFrame],
+):
+ context = gx.get_context(mode="ephemeral")
+ datasource = context.data_sources.add_spark(name="ds")
+ spark_df = spark_df_from_pandas_df(
+ spark_session, pd.DataFrame({"column_name": [1, 2, 3, 4, 5]})
+ )
+ _validate_whole_dataframe_batch_definition(
+ ContextSparkDataSourceAndFrame(context=context, datasource=datasource, dataframe=spark_df)
)
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
- assert dataframe_asset.dataframe.toPandas().equals(df) # type: ignore[union-attr]
diff --git a/tests/datasource/fluent/test_pandas_datasource.py b/tests/datasource/fluent/test_pandas_datasource.py
index f1a6332a9f34..c0ea4258b3a4 100644
--- a/tests/datasource/fluent/test_pandas_datasource.py
+++ b/tests/datasource/fluent/test_pandas_datasource.py
@@ -30,6 +30,8 @@
DefaultPandasDatasourceError,
_get_field_details,
)
+from great_expectations.exceptions.exceptions import BuildBatchRequestError
+from great_expectations.execution_engine.pandas_batch_data import PandasBatchData
from great_expectations.util import camel_to_snake
if TYPE_CHECKING:
@@ -462,11 +464,11 @@ def test_read_dataframe(empty_data_context: AbstractDataContext, test_df_pandas:
assert isinstance(dataframe_asset, DataFrameAsset)
assert dataframe_asset.name == "my_dataframe_asset"
assert len(empty_data_context.data_sources.pandas_default.assets) == 2
- _ = dataframe_asset.build_batch_request(dataframe=test_df_pandas)
- assert all(
- asset.dataframe.equals(test_df_pandas) # type: ignore[attr-defined]
- for asset in empty_data_context.data_sources.pandas_default.assets
- )
+ bd = dataframe_asset.add_batch_definition_whole_dataframe(name="bd")
+ bd_batch = bd.get_batch(batch_parameters={"dataframe": test_df_pandas})
+ for b in [batch, bd_batch]:
+ assert isinstance(b.data, PandasBatchData)
+ b.data.dataframe.equals(test_df_pandas)
@pytest.mark.cloud
@@ -532,7 +534,7 @@ def test_build_batch_request_raises_if_missing_dataframe(
name="fluent_pandas_datasource"
).add_dataframe_asset(name="my_df_asset")
- with pytest.raises(ValueError) as e:
+ with pytest.raises(BuildBatchRequestError) as e:
dataframe_asset.build_batch_request()
- assert "Cannot build batch request for dataframe asset without a dataframe" in str(e.value)
+ assert str(e.value).startswith("Bad input to build_batch_request:")
diff --git a/tests/datasource/fluent/test_spark_datasource.py b/tests/datasource/fluent/test_spark_datasource.py
index e017d65ae0da..42ad508fb9d6 100644
--- a/tests/datasource/fluent/test_spark_datasource.py
+++ b/tests/datasource/fluent/test_spark_datasource.py
@@ -7,12 +7,13 @@
import pytest
-from great_expectations.compatibility import pydantic
from great_expectations.datasource.fluent import TestConnectionError
from great_expectations.datasource.fluent.spark_datasource import (
DataFrameAsset,
SparkConfig,
)
+from great_expectations.exceptions.exceptions import BuildBatchRequestError
+from great_expectations.execution_engine.sparkdf_batch_data import SparkDFBatchData
from great_expectations.util import is_candidate_subset_of_target
if TYPE_CHECKING:
@@ -36,13 +37,6 @@ def test_dataframe_asset(
spark_df_from_pandas_df,
test_df_pandas,
):
- # validates that a dataframe object is passed
- with pytest.raises(pydantic.ValidationError) as exc_info:
- _ = DataFrameAsset(name="malformed_asset", dataframe={})
-
- errors_dict = exc_info.value.errors()[0]
- assert errors_dict["loc"][0] == "dataframe"
-
datasource = empty_data_context.data_sources.add_spark(name="my_spark_datasource")
pandas_df = test_df_pandas
@@ -55,19 +49,16 @@ def test_dataframe_asset(
assert dataframe_asset.name == "my_dataframe_asset"
assert len(datasource.assets) == 1
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
-
- dataframe_asset = datasource.add_dataframe_asset(
+ datasource.add_dataframe_asset(
name="my_second_dataframe_asset",
)
assert len(datasource.assets) == 2
- _ = dataframe_asset.build_batch_request(dataframe=spark_df)
-
- assert all(
- asset.dataframe is not None and asset.dataframe.toPandas().equals(pandas_df)
- for asset in datasource.assets
- )
+ for i, asset in enumerate(datasource.assets):
+ bd = asset.add_batch_definition_whole_dataframe(name=f"bd_{i}")
+ batch = bd.get_batch(batch_parameters={"dataframe": spark_df})
+ assert isinstance(batch.data, SparkDFBatchData)
+ assert batch.data.dataframe.toPandas().equals(pandas_df)
@pytest.mark.spark
@@ -98,7 +89,7 @@ def test_spark_data_asset_batch_metadata(
assert dataframe_asset.batch_metadata == batch_metadata
batch_list = dataframe_asset.get_batch_list_from_batch_request(
- dataframe_asset.build_batch_request(dataframe=spark_df)
+ dataframe_asset.build_batch_request(options={"dataframe": spark_df})
)
assert len(batch_list) == 1
substituted_batch_metadata = copy.deepcopy(batch_metadata)
@@ -144,10 +135,10 @@ def test_build_batch_request_raises_if_missing_dataframe(
name="my_spark_datasource"
).add_dataframe_asset(name="my_dataframe_asset")
- with pytest.raises(ValueError) as e:
+ with pytest.raises(BuildBatchRequestError) as e:
dataframe_asset.build_batch_request()
- assert "Cannot build batch request for dataframe asset without a dataframe" in str(e.value)
+ assert "options must contain exactly 1 key, 'dataframe'" in str(e.value)
@pytest.mark.spark
diff --git a/tests/expectations/core/test_expect_column_values_to_match_regex_parameterized.py b/tests/expectations/core/test_expect_column_values_to_match_regex_parameterized.py
index 186fb26c5a9f..fc1fe616f316 100644
--- a/tests/expectations/core/test_expect_column_values_to_match_regex_parameterized.py
+++ b/tests/expectations/core/test_expect_column_values_to_match_regex_parameterized.py
@@ -14,10 +14,10 @@ def test_expect_column_values_as_string_to_be_positive_integers_pass(
empty_data_context: AbstractDataContext,
):
df = pd.DataFrame({"a": ["1", "2", "3", "4", "5"]})
- data_asset = empty_data_context.data_sources.pandas_default.add_dataframe_asset(
- "my_dataframe", dataframe=df
+ data_asset = empty_data_context.data_sources.pandas_default.add_dataframe_asset("my_dataframe")
+ batch = data_asset.add_batch_definition_whole_dataframe("my_batch_definition").get_batch(
+ batch_parameters={"dataframe": df}
)
- batch = data_asset.add_batch_definition_whole_dataframe("my_batch_definition").get_batch()
result = batch.validate(ExpectColumnValuesAsStringToBePositiveInteger(column="a"))
@@ -30,10 +30,10 @@ def test_expect_column_values_as_string_to_be_positive_integers_fail(
):
df = pd.DataFrame({"a": ["1", "2", "3", "4", "a"]})
- data_asset = empty_data_context.data_sources.pandas_default.add_dataframe_asset(
- "my_dataframe", dataframe=df
+ data_asset = empty_data_context.data_sources.pandas_default.add_dataframe_asset("my_dataframe")
+ batch = data_asset.add_batch_definition_whole_dataframe("my_batch_definition").get_batch(
+ batch_parameters={"dataframe": df}
)
- batch = data_asset.add_batch_definition_whole_dataframe("my_batch_definition").get_batch()
result = batch.validate(ExpectColumnValuesAsStringToBePositiveInteger(column="a"))
diff --git a/tests/experimental/metric_repository/test_metric_list_metric_retriever_integration.py b/tests/experimental/metric_repository/test_metric_list_metric_retriever_integration.py
index 2a6f690d8a17..2309f3e78362 100644
--- a/tests/experimental/metric_repository/test_metric_list_metric_retriever_integration.py
+++ b/tests/experimental/metric_repository/test_metric_list_metric_retriever_integration.py
@@ -43,7 +43,7 @@ def cloud_context_and_batch_request_with_simple_dataframe(
name = "dataframe"
data_asset = datasource.add_dataframe_asset(name=name)
- batch_request = data_asset.build_batch_request(dataframe=df)
+ batch_request = data_asset.build_batch_request(options={"dataframe": df})
return context, batch_request
diff --git a/tests/integration/cloud/end_to_end/conftest.py b/tests/integration/cloud/end_to_end/conftest.py
index 7b86b6e42a32..7903c9d4e651 100644
--- a/tests/integration/cloud/end_to_end/conftest.py
+++ b/tests/integration/cloud/end_to_end/conftest.py
@@ -16,6 +16,7 @@
from great_expectations.core import ExpectationSuite
from great_expectations.core.validation_definition import ValidationDefinition
from great_expectations.data_context import CloudDataContext
+from great_expectations.exceptions.exceptions import BuildBatchRequestError
from great_expectations.execution_engine import (
SparkDFExecutionEngine,
SqlAlchemyExecutionEngine,
@@ -124,7 +125,7 @@ def get_missing_data_asset_error_type() -> type[Exception]:
@pytest.fixture(scope="package")
def in_memory_batch_request_missing_dataframe_error_type() -> type[Exception]:
- return ValueError
+ return BuildBatchRequestError
class TableFactory(Protocol):
diff --git a/tests/integration/cloud/end_to_end/test_pandas_datasource.py b/tests/integration/cloud/end_to_end/test_pandas_datasource.py
index 549fb55c7f52..f6e254e91370 100644
--- a/tests/integration/cloud/end_to_end/test_pandas_datasource.py
+++ b/tests/integration/cloud/end_to_end/test_pandas_datasource.py
@@ -103,7 +103,7 @@ def batch_request(
if isinstance(data_asset, DataFrameAsset):
with pytest.raises(in_memory_batch_request_missing_dataframe_error_type):
data_asset.build_batch_request()
- batch_request = data_asset.build_batch_request(dataframe=pandas_test_df)
+ batch_request = data_asset.build_batch_request(options={"dataframe": pandas_test_df})
else:
batch_request = data_asset.build_batch_request()
return batch_request
diff --git a/tests/integration/cloud/end_to_end/test_spark_datasource.py b/tests/integration/cloud/end_to_end/test_spark_datasource.py
index 54556be9e1b4..abe7dac09a3d 100644
--- a/tests/integration/cloud/end_to_end/test_spark_datasource.py
+++ b/tests/integration/cloud/end_to_end/test_spark_datasource.py
@@ -101,7 +101,7 @@ def batch_request(
},
)
spark_df: pyspark.DataFrame = spark_df_from_pandas_df(spark_session, pandas_df)
- batch_request = data_asset.build_batch_request(dataframe=spark_df)
+ batch_request = data_asset.build_batch_request(options={"dataframe": spark_df})
else:
batch_request = data_asset.build_batch_request()
return batch_request
diff --git a/tests/validator/test_validator.py b/tests/validator/test_validator.py
index 324083d9dbb4..7cd24df1851d 100644
--- a/tests/validator/test_validator.py
+++ b/tests/validator/test_validator.py
@@ -140,10 +140,15 @@ def yellow_trip_pandas_data_context(
def test_validator_default_expectation_args__pandas(basic_datasource: PandasDatasource):
asset = basic_datasource.add_dataframe_asset(
"my_asset",
- dataframe=pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]}),
)
- batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch_definition = asset.add_batch_definition_whole_dataframe(
+ name="my batch definition",
+ )
+ batch = batch_definition.get_batch(
+ batch_parameters={
+ "dataframe": pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]})
+ }
+ )
my_validator = Validator(execution_engine=PandasExecutionEngine(), batches=[batch])
@@ -194,12 +199,13 @@ def multi_batch_taxi_validator_ge_cloud_mode(
@pytest.mark.big
def test_graph_validate(in_memory_runtime_context, basic_datasource: PandasDatasource):
- asset = basic_datasource.add_dataframe_asset(
- "my_asset",
- dataframe=pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]}),
- )
+ asset = basic_datasource.add_dataframe_asset("my_asset")
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(
+ batch_parameters={
+ "dataframe": pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]})
+ },
+ )
expectation_configuration = ExpectationConfiguration(
type="expect_column_value_z_scores_to_be_less_than",
@@ -241,12 +247,15 @@ def test_graph_validate_with_runtime_config(
):
asset = basic_datasource.add_dataframe_asset(
"my_asset",
- dataframe=pd.DataFrame(
- {"a": [1, 5, 22, 3, 5, 10, 2, 3], "b": [97, 332, 3, 4, 5, 6, 7, None]}
- ),
)
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(
+ {
+ "dataframe": pd.DataFrame(
+ {"a": [1, 5, 22, 3, 5, 10, 2, 3], "b": [97, 332, 3, 4, 5, 6, 7, None]}
+ )
+ },
+ )
expectation_configuration = ExpectationConfiguration(
type="expect_column_value_z_scores_to_be_less_than",
@@ -297,9 +306,9 @@ def mock_error(*args, **kwargs):
raise Exception("Mock Error") # noqa: TRY002
df = pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]})
- asset = basic_datasource.add_dataframe_asset("my_asset", dataframe=df)
+ asset = basic_datasource.add_dataframe_asset("my_asset")
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
expectation_configuration = ExpectationConfiguration(
type="expect_column_value_z_scores_to_be_less_than",
@@ -329,9 +338,9 @@ def test_graph_validate_with_bad_config_catch_exceptions_false(
in_memory_runtime_context, basic_datasource: PandasDatasource
):
df = pd.DataFrame({"a": [1, 5, 22, 3, 5, 10], "b": [1, 2, 3, 4, 5, None]})
- asset = basic_datasource.add_dataframe_asset("my_asset", dataframe=df)
+ asset = basic_datasource.add_dataframe_asset("my_asset")
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
expectation_configuration = ExpectationConfiguration(
type="expect_column_max_to_be_between",
@@ -376,7 +385,7 @@ def test_validator_validate_substitutes_suite_parameters(
column_name = "my_column"
datasource = context.data_sources.add_pandas(name="my_datasource")
asset = datasource.add_dataframe_asset(
- "my_asset", dataframe=pd.DataFrame({column_name: [0, 1, 2, 3, 4]})
+ "my_asset",
)
suite = context.suites.add(ExpectationSuite(suite_name))
suite.add_expectation(
@@ -385,7 +394,9 @@ def test_validator_validate_substitutes_suite_parameters(
)
)
validator = context.get_validator(
- batch_request=asset.build_batch_request(),
+ batch_request=asset.build_batch_request(
+ options={"dataframe": pd.DataFrame({column_name: [0, 1, 2, 3, 4]})}
+ ),
expectation_suite_name=suite_name,
)
@@ -405,12 +416,15 @@ def test_rendered_content_bool_only_respected(result_format: str | dict):
context = get_context(mode="ephemeral")
csv_asset = context.data_sources.pandas_default.add_dataframe_asset(
"df",
- dataframe=pd.DataFrame(
- data=[1, 2, 3],
- columns=["numbers_i_can_count_to"],
- ),
)
- batch_request = csv_asset.build_batch_request()
+ batch_request = csv_asset.build_batch_request(
+ options={
+ "dataframe": pd.DataFrame(
+ data=[1, 2, 3],
+ columns=["numbers_i_can_count_to"],
+ )
+ }
+ )
expectation_suite_name = "test_result_format_suite"
context.suites.add(ExpectationSuite(name=expectation_suite_name))
@@ -530,9 +544,9 @@ def test_graph_validate_with_two_expectations_and_first_expectation_without_addi
],
columns=["var"],
)
- asset = basic_datasource.add_dataframe_asset("my_asset", dataframe=df)
+ asset = basic_datasource.add_dataframe_asset("my_asset")
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
expectation_configuration_expect_column_values_to_be_null = ExpectationConfiguration(
type="expect_column_values_to_be_null",
@@ -679,9 +693,9 @@ def test_graph_validate_with_two_expectations_and_first_expectation_with_result_
columns=["var"],
)
- asset = basic_datasource.add_dataframe_asset("my_asset", dataframe=df)
+ asset = basic_datasource.add_dataframe_asset("my_asset")
batch_definition = asset.add_batch_definition_whole_dataframe("my batch definition")
- batch = batch_definition.get_batch()
+ batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
expectation_configuration_expect_column_values_to_be_null = ExpectationConfiguration(
type="expect_column_values_to_be_null",