diff --git a/great_expectations/datasource/__init__.py b/great_expectations/datasource/__init__.py index 3876b07aca3d..b5f5456f88d7 100644 --- a/great_expectations/datasource/__init__.py +++ b/great_expectations/datasource/__init__.py @@ -1,6 +1,3 @@ from .data_connector.data_connector import DataConnector -from .datasource import LegacyDatasource from .new_datasource import BaseDatasource, Datasource -from .pandas_datasource import PandasDatasource from .simple_sqlalchemy_datasource import SimpleSqlalchemyDatasource -from .sparkdf_datasource import SparkDFDatasource diff --git a/great_expectations/datasource/datasource.py b/great_expectations/datasource/datasource.py deleted file mode 100644 index ee5bde00722a..000000000000 --- a/great_expectations/datasource/datasource.py +++ /dev/null @@ -1,392 +0,0 @@ -import copy -import logging -import warnings - -from great_expectations.data_context.util import ( - instantiate_class_from_config, - load_class, - verify_dynamic_loading_support, -) -from great_expectations.exceptions import ClassInstantiationError - -logger = logging.getLogger(__name__) - - -class LegacyDatasource: - """ - A Datasource connects to a compute environment and one or more storage environments and produces batches of data - that Great Expectations can validate in that compute environment. - - Each Datasource provides Batches connected to a specific compute environment, such as a - SQL database, a Spark cluster, or a local in-memory Pandas DataFrame. - - Datasources use Batch Kwargs to specify instructions for how to access data from relevant sources such as an - existing object from a DAG runner, a SQL database, S3 bucket, or local filesystem. - - To bridge the gap between those worlds, Datasources interact closely with *generators* which - are aware of a source of data and can produce produce identifying information, called - "batch_kwargs" that datasources can use to get individual batches of data. They add flexibility - in how to obtain data such as with time-based partitioning, downsampling, or other techniques - appropriate for the datasource. - - For example, a batch kwargs generator could produce a SQL query that logically represents "rows in the Events - table with a timestamp on February 7, 2012," which a SqlAlchemyDatasource could use to materialize - a SqlAlchemyDataset corresponding to that batch of data and ready for validation. - - Since opinionated DAG managers such as airflow, dbt, prefect.io, dagster can also act as datasources - and/or batch kwargs generators for a more generic datasource. - - When adding custom expectations by subclassing an existing DataAsset type, use the data_asset_type parameter - to configure the datasource to load and return DataAssets of the custom type. - - --ge-feature-maturity-info-- - - id: datasource_s3 - title: Datasource - S3 - icon: - short_description: S3 - description: Support for connecting to Amazon Web Services S3 as an external datasource. - how_to_guide_url: https://docs.greatexpectations.io/en/latest/how_to_guides/configuring_datasources/how_to_configure_a_pandas_s3_datasource.html - maturity: Production - maturity_details: - api_stability: medium - implementation_completeness: Complete - unit_test_coverage:: Complete - integration_infrastructure_test_coverage: None - documentation_completeness: Minimal/Spotty - bug_risk: Low - - id: datasource_filesystem - title: Datasource - Filesystem - icon: - short_description: File-based datsource - description: Support for using a mounted filesystem as an external datasource. - how_to_guide_url: https://docs.greatexpectations.io/en/latest/how_to_guides/configuring_datasources/how_to_configure_a_pandas_filesystem_datasource.html - maturity: Production - maturity_details: - api_stability: Medium - implementation_completeness: Complete - unit_test_coverage: Complete - integration_infrastructure_test_coverage: Partial - documentation_completeness: Partial - bug_risk: Low (Moderate for Windows users because of path issues) - - id: datasource_gcs - title: Datasource - GCS - icon: - short_description: GCS - description: Support for Google Cloud Storage as an external datasource - how_to_guide_url: - maturity: Experimental - maturity_details: - api_stability: Medium (supported via native `gs://' syntax in Pandas and Pyspark; medium because we expect configuration to evolve) - implementation_completeness: Medium (works via passthrough, not via CLI) - unit_test_coverage: Minimal - integration_infrastructure_test_coverage: Minimal - documentation_completeness: Minimal - bug_risk: Moderate - - id: datasource_azure_blob_storage - title: Datasource - Azure Blob Storage - icon: - short_description: Azure Blob Storage - description: Support for Microsoft Azure Blob Storage as an external datasource - how_to_guide_url: - maturity: In Roadmap (Sub-Experimental - "Not Impossible") - maturity_details: - api_stability: N/A (Supported on Databricks Spark via `wasb://' / `wasps://' url; requires local download first for Pandas) - implementation_completeness: Minimal - unit_test_coverage: N/A - integration_infrastructure_test_coverage: N/A - documentation_completeness: Minimal - bug_risk: Unknown - --ge-feature-maturity-info-- - """ - - recognized_batch_parameters = {"limit"} - - @classmethod - def from_configuration(cls, **kwargs): - """ - Build a new datasource from a configuration dictionary. - - Args: - **kwargs: configuration key-value pairs - - Returns: - datasource (Datasource): the newly-created datasource - - """ - return cls(**kwargs) - - @classmethod - def build_configuration( - cls, - class_name, - module_name="great_expectations.datasource", - data_asset_type=None, - batch_kwargs_generators=None, - **kwargs, - ): - """ - Build a full configuration object for a datasource, potentially including batch kwargs generators with defaults. - - Args: - class_name: The name of the class for which to build the config - module_name: The name of the module in which the datasource class is located - data_asset_type: A ClassConfig dictionary - batch_kwargs_generators: BatchKwargGenerators configuration dictionary - **kwargs: Additional kwargs to be part of the datasource constructor's initialization - - Returns: - A complete datasource configuration. - - """ - verify_dynamic_loading_support(module_name=module_name) - class_ = load_class(class_name=class_name, module_name=module_name) - configuration = class_.build_configuration( - data_asset_type=data_asset_type, - batch_kwargs_generators=batch_kwargs_generators, - **kwargs, - ) - return configuration - - def __init__( - self, - name, - data_context=None, - data_asset_type=None, - batch_kwargs_generators=None, - **kwargs, - ) -> None: - """ - Build a new datasource. - - Args: - name: the name for the datasource - data_context: data context to which to connect - data_asset_type (ClassConfig): the type of DataAsset to produce - batch_kwargs_generators: BatchKwargGenerators to add to the datasource - """ - self._data_context = data_context - self._name = name - # deprecated-v0.7.11 - if isinstance(data_asset_type, str): - warnings.warn( - "String-only configuration for data_asset_type is deprecated as of v0.7.11. " - "As support will be removed in v0.16, please use module_name and class_name instead.", - DeprecationWarning, - ) - self._data_asset_type = data_asset_type - self._datasource_config = kwargs - self._batch_kwargs_generators: dict = {} - - self._datasource_config["data_asset_type"] = data_asset_type - if batch_kwargs_generators is not None: - self._datasource_config["batch_kwargs_generators"] = batch_kwargs_generators - - # Chetan - 20221103 - This attribute is meant to represent the config args used to instantiate the object (before ${VARIABLE} substitution). - # While downstream logic should override this value, we default to `self._datasource_config` as a backup. - # This is to be removed once substitution logic is migrated from the context to the individual object level. - self._raw_config = self._datasource_config - - @property - def name(self): - """ - Property for datasource name - """ - return self._name - - @property - def config(self): - return copy.deepcopy(self._datasource_config) - - @property - def data_context(self): - """ - Property for attached DataContext - """ - return self._data_context - - def _build_generators(self) -> None: - """ - Build batch kwargs generator objects from the datasource configuration. - - Returns: - None - """ - try: - for generator in self._datasource_config["batch_kwargs_generators"].keys(): - self.get_batch_kwargs_generator(generator) - except KeyError: - pass - - def add_batch_kwargs_generator(self, name, class_name, **kwargs): - """Add a BatchKwargGenerator to the datasource. - - Args: - name (str): the name of the new BatchKwargGenerator to add - class_name: class of the BatchKwargGenerator to add - kwargs: additional keyword arguments will be passed directly to the new BatchKwargGenerator's constructor - - Returns: - BatchKwargGenerator (BatchKwargGenerator) - """ - kwargs["class_name"] = class_name - generator = self._build_batch_kwargs_generator(**kwargs) - if "batch_kwargs_generators" not in self._datasource_config: - self._datasource_config["batch_kwargs_generators"] = {} - self._datasource_config["batch_kwargs_generators"][name] = kwargs - - return generator - - def _build_batch_kwargs_generator(self, **kwargs): - """Build a BatchKwargGenerator using the provided configuration and return the newly-built generator.""" - generator = instantiate_class_from_config( - config=kwargs, - runtime_environment={"datasource": self}, - config_defaults={ - "module_name": "great_expectations.datasource.batch_kwargs_generator" - }, - ) - if not generator: - raise ClassInstantiationError( - module_name="great_expectations.datasource.batch_kwargs_generator", - package_name=None, - class_name=kwargs["class_name"], - ) - - return generator - - def get_batch_kwargs_generator(self, name): - """Get the (named) BatchKwargGenerator from a datasource - - Args: - name (str): name of BatchKwargGenerator (default value is 'default') - - Returns: - BatchKwargGenerator (BatchKwargGenerator) - """ - if name in self._batch_kwargs_generators: - return self._batch_kwargs_generators[name] - elif ( - "batch_kwargs_generators" in self._datasource_config - and name in self._datasource_config["batch_kwargs_generators"] - ): - generator_config = copy.deepcopy( - self._datasource_config["batch_kwargs_generators"][name] - ) - else: - raise ValueError( - f"Unable to load batch kwargs generator {name} -- no configuration found or invalid configuration." - ) - generator = self._build_batch_kwargs_generator(**generator_config) - self._batch_kwargs_generators[name] = generator - return generator - - def list_batch_kwargs_generators(self): - """List currently-configured BatchKwargGenerator for this datasource. - - Returns: - List(dict): each dictionary includes "name" and "type" keys - """ - generators = [] - - if "batch_kwargs_generators" in self._datasource_config: - for key, value in self._datasource_config[ - "batch_kwargs_generators" - ].items(): - generators.append({"name": key, "class_name": value["class_name"]}) - - return generators - - # TODO: move to data connector - def process_batch_parameters(self, limit=None, dataset_options=None): - """Use datasource-specific configuration to translate any batch parameters into batch kwargs at the datasource - level. - - Args: - limit (int): a parameter all datasources must accept to allow limiting a batch to a smaller number of rows. - dataset_options (dict): a set of kwargs that will be passed to the constructor of a dataset built using - these batch_kwargs - - Returns: - batch_kwargs: Result will include both parameters passed via argument and configured parameters. - """ - batch_kwargs = self._datasource_config.get("batch_kwargs", {}) - - if limit is not None: - batch_kwargs["limit"] = limit - - if dataset_options is not None: - # Then update with any locally-specified reader options - if not batch_kwargs.get("dataset_options"): - batch_kwargs["dataset_options"] = {} - batch_kwargs["dataset_options"].update(dataset_options) - - return batch_kwargs - - # TODO: move to execution engine or make a wrapper - def get_batch(self, batch_kwargs, batch_parameters=None) -> None: - """Get a batch of data from the datasource. - - Args: - batch_kwargs: the BatchKwargs to use to construct the batch - batch_parameters: optional parameters to store as the reference description of the batch. They should - reflect parameters that would provide the passed BatchKwargs. - - - Returns: - Batch - - """ - raise NotImplementedError - - def get_available_data_asset_names(self, batch_kwargs_generator_names=None): - """ - Returns a dictionary of data_asset_names that the specified batch kwarg - generator can provide. Note that some batch kwargs generators may not be - capable of describing specific named data assets, and some (such as - filesystem glob batch kwargs generators) require the user to configure - data asset names. - - Args: - batch_kwargs_generator_names: the BatchKwargGenerator for which to get available data asset names. - - Returns: - dictionary consisting of sets of generator assets available for the specified generators: - :: - - { - generator_name: { - names: [ (data_asset_1, data_asset_1_type), (data_asset_2, data_asset_2_type) ... ] - } - ... - } - - """ - available_data_asset_names = {} - if batch_kwargs_generator_names is None: - batch_kwargs_generator_names = [ - generator["name"] for generator in self.list_batch_kwargs_generators() - ] - elif isinstance(batch_kwargs_generator_names, str): - batch_kwargs_generator_names = [batch_kwargs_generator_names] - - for generator_name in batch_kwargs_generator_names: - generator = self.get_batch_kwargs_generator(generator_name) - available_data_asset_names[ - generator_name - ] = generator.get_available_data_asset_names() - return available_data_asset_names - - # TODO: move to dataconnector - def build_batch_kwargs( - self, batch_kwargs_generator, data_asset_name=None, partition_id=None, **kwargs - ): - generator_obj = self.get_batch_kwargs_generator(batch_kwargs_generator) - if partition_id is not None: - kwargs["partition_id"] = partition_id - return generator_obj.build_batch_kwargs( - data_asset_name=data_asset_name, **kwargs - ) diff --git a/great_expectations/datasource/pandas_datasource.py b/great_expectations/datasource/pandas_datasource.py deleted file mode 100644 index 6eabbfa2b425..000000000000 --- a/great_expectations/datasource/pandas_datasource.py +++ /dev/null @@ -1,356 +0,0 @@ -import datetime -import logging -import uuid -import warnings -from collections.abc import Callable -from functools import partial -from io import BytesIO - -import pandas as pd - -from great_expectations.core.batch import Batch, BatchMarkers -from great_expectations.core.util import S3Url -from great_expectations.datasource.datasource import LegacyDatasource -from great_expectations.exceptions import BatchKwargsError -from great_expectations.execution_engine.pandas_execution_engine import ( - hash_pandas_dataframe, -) -from great_expectations.types import ClassConfig -from great_expectations.types.configurations import classConfigSchema - -logger = logging.getLogger(__name__) - -HASH_THRESHOLD = 1e9 - - -class PandasDatasource(LegacyDatasource): - """The PandasDatasource produces PandasDataset objects and supports generators capable of - interacting with the local filesystem (the default subdir_reader generator), and from - existing in-memory dataframes. - """ - - recognized_batch_parameters = { - "reader_method", - "reader_options", - "limit", - "dataset_options", - "boto3_options", - } - - @classmethod - def build_configuration( # noqa: PLR0913 - cls, - data_asset_type=None, - batch_kwargs_generators=None, - boto3_options=None, - reader_method=None, - reader_options=None, - limit=None, - **kwargs, - ): - """ - Build a full configuration object for a datasource, potentially including generators with defaults. - - Args: - data_asset_type: A ClassConfig dictionary - batch_kwargs_generators: Generator configuration dictionary - boto3_options: Optional dictionary with key-value pairs to pass to boto3 during instantiation. - reader_method: Optional default reader_method for generated batches - reader_options: Optional default reader_options for generated batches - limit: Optional default limit for generated batches - **kwargs: Additional kwargs to be part of the datasource constructor's initialization - - Returns: - A complete datasource configuration. - - """ - - if data_asset_type is None: - data_asset_type = { - "class_name": "PandasDataset", - "module_name": "great_expectations.dataset", - } - else: - data_asset_type = classConfigSchema.dump(ClassConfig(**data_asset_type)) - - configuration = kwargs - configuration["data_asset_type"] = data_asset_type - if batch_kwargs_generators: - configuration["batch_kwargs_generators"] = batch_kwargs_generators - - if boto3_options is not None: - if isinstance(boto3_options, dict): - configuration.update(boto3_options) - else: - raise ValueError( - "boto3_options must be a dictionary of key-value pairs to pass to boto3 upon " - "initialization." - ) - configuration["boto3_options"] = boto3_options - - if reader_options is not None: - if isinstance(reader_options, dict): - configuration.update(reader_options) - else: - raise ValueError( - "boto3_options must be a dictionary of key-value pairs to pass to boto3 upon " - "initialization." - ) - - if reader_method is not None: - configuration["reader_method"] = reader_method - - if limit is not None: - configuration["limit"] = limit - - return configuration - - def __init__( # noqa: PLR0913 - self, - name="pandas", - data_context=None, - data_asset_type=None, - batch_kwargs_generators=None, - boto3_options=None, - reader_method=None, - reader_options=None, - limit=None, - **kwargs, - ) -> None: - configuration_with_defaults = PandasDatasource.build_configuration( - data_asset_type, - batch_kwargs_generators, - boto3_options, - reader_method=reader_method, - reader_options=reader_options, - limit=limit, - **kwargs, - ) - - data_asset_type = configuration_with_defaults.pop("data_asset_type") - batch_kwargs_generators = configuration_with_defaults.pop( - "batch_kwargs_generators", None - ) - super().__init__( - name, - data_context=data_context, - data_asset_type=data_asset_type, - batch_kwargs_generators=batch_kwargs_generators, - **configuration_with_defaults, - ) - - self._build_generators() - self._boto3_options = configuration_with_defaults.get("boto3_options", {}) - self._reader_method = configuration_with_defaults.get("reader_method", None) - self._reader_options = configuration_with_defaults.get("reader_options", None) - self._limit = configuration_with_defaults.get("limit", None) - - # TODO: move to data connector - def process_batch_parameters( - self, - reader_method=None, - reader_options=None, - limit=None, - dataset_options=None, - ): - # Note that we do not pass limit up, since even that will be handled by PandasDatasource - batch_kwargs = super().process_batch_parameters(dataset_options=dataset_options) - - # Apply globally-configured reader options first - if self._reader_options: - # Then update with any locally-specified reader options - if not batch_kwargs.get("reader_options"): - batch_kwargs["reader_options"] = {} - batch_kwargs["reader_options"].update(self._reader_options) - - # Then update with any locally-specified reader options - if reader_options: - if not batch_kwargs.get("reader_options"): - batch_kwargs["reader_options"] = {} - batch_kwargs["reader_options"].update(reader_options) - - if self._limit: - if not batch_kwargs.get("reader_options"): - batch_kwargs["reader_options"] = {} - batch_kwargs["reader_options"]["nrows"] = self._limit - - if limit is not None: - if not batch_kwargs.get("reader_options"): - batch_kwargs["reader_options"] = {} - batch_kwargs["reader_options"]["nrows"] = limit - - if self._reader_method: - batch_kwargs["reader_method"] = self._reader_method - - if reader_method is not None: - batch_kwargs["reader_method"] = reader_method - - return batch_kwargs - - # TODO: move to execution engine or make a wrapper - def get_batch(self, batch_kwargs, batch_parameters=None): - # We will use and manipulate reader_options along the way - reader_options = batch_kwargs.get("reader_options", {}) - - # We need to build a batch_markers to be used in the dataframe - batch_markers = BatchMarkers( - { - "ge_load_time": datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y%m%dT%H%M%S.%fZ" - ) - } - ) - - if "path" in batch_kwargs: - path = batch_kwargs["path"] - reader_method = batch_kwargs.get("reader_method") - reader_fn = self._get_reader_fn(reader_method, path) - df = reader_fn(path, **reader_options) - - elif "s3" in batch_kwargs: - # deprecated-v0.13.0 - warnings.warn( - "Direct GX Support for the s3 BatchKwarg is deprecated as of v0.13.0 and will be removed in v0.16. " - "Please use a path including the s3a:// protocol instead.", - DeprecationWarning, - ) - try: - from great_expectations.compatibility import aws - - s3 = aws.boto3.client("s3", **self._boto3_options) - except ImportError: - raise BatchKwargsError( - "Unable to load boto3 client to read s3 asset.", batch_kwargs - ) - raw_url = batch_kwargs["s3"] - reader_method = batch_kwargs.get("reader_method") - url = S3Url(raw_url) - logger.debug(f"Fetching s3 object. Bucket: {url.bucket} Key: {url.key}") - s3_object = s3.get_object(Bucket=url.bucket, Key=url.key) - reader_fn = self._get_reader_fn(reader_method, url.key) - default_reader_options = self._infer_default_options( - reader_fn, reader_options - ) - if not reader_options.get("encoding") and default_reader_options.get( - "encoding" - ): - reader_options["encoding"] = s3_object.get( - "ContentEncoding", default_reader_options.get("encoding") - ) - df = reader_fn(BytesIO(s3_object["Body"].read()), **reader_options) - - elif "dataset" in batch_kwargs and isinstance( - batch_kwargs["dataset"], (pd.DataFrame, pd.Series) - ): - df = batch_kwargs.get("dataset") - # We don't want to store the actual dataframe in kwargs; copy the remaining batch_kwargs - batch_kwargs = {k: batch_kwargs[k] for k in batch_kwargs if k != "dataset"} - batch_kwargs["PandasInMemoryDF"] = True - batch_kwargs["ge_batch_id"] = str(uuid.uuid1()) - - else: - raise BatchKwargsError( - "Invalid batch_kwargs: path, s3, or dataset is required for a PandasDatasource", - batch_kwargs, - ) - - if df.memory_usage().sum() < HASH_THRESHOLD: - batch_markers["pandas_data_fingerprint"] = hash_pandas_dataframe(df) - - return Batch( - datasource_name=self.name, - batch_kwargs=batch_kwargs, - data=df, - batch_parameters=batch_parameters, - batch_markers=batch_markers, - data_context=self._data_context, - ) - - @staticmethod - def guess_reader_method_from_path(path: str): # noqa: PLR0911 - path = path.lower() - if path.endswith(".csv") or path.endswith(".tsv"): - return {"reader_method": "read_csv"} - elif ( - path.endswith(".parquet") or path.endswith(".parq") or path.endswith(".pqt") - ): - return {"reader_method": "read_parquet"} - elif path.endswith(".xlsx") or path.endswith(".xls"): - return {"reader_method": "read_excel"} - elif path.endswith(".json"): - return {"reader_method": "read_json"} - elif path.endswith(".pkl"): - return {"reader_method": "read_pickle"} - elif path.endswith(".feather"): - return {"reader_method": "read_feather"} - elif path.endswith(".csv.gz") or path.endswith(".csv.gz"): - return { - "reader_method": "read_csv", - "reader_options": {"compression": "gzip"}, - } - elif path.endswith(".sas7bdat") or path.endswith(".xpt"): - return {"reader_method": "read_sas"} - - raise BatchKwargsError( - f"Unable to determine reader method from path: {path}", - {"path": path}, - ) - - def _infer_default_options(self, reader_fn: Callable, reader_options: dict) -> dict: - """ - Allows reader options to be customized based on file context before loading to a DataFrame - - Args: - reader_method (str): pandas reader method - reader_options: Current options and defaults set to pass to the reader method - - Returns: - dict: A copy of the reader options post-inference - """ - while isinstance(reader_fn, partial): - # reader_fn might be partial so need to unwrap to get underlying method - reader_fn = reader_fn.func - name = reader_fn.__name__ - if name == "read_parquet": - return {} - if name == "read_excel": - return {} - else: - return {"encoding": "utf-8"} - - def _get_reader_fn(self, reader_method=None, path=None): - """Static helper for parsing reader types. If reader_method is not provided, path will be used to guess the - correct reader_method. - - Args: - reader_method (str): the name of the reader method to use, if available. - path (str): the to use to guess - - Returns: - ReaderMethod to use for the filepath - - """ - if reader_method is None and path is None: - raise BatchKwargsError( - "Unable to determine pandas reader function without reader_method or path.", - {"reader_method": reader_method}, - ) - - reader_options = None - if reader_method is None: - path_guess = self.guess_reader_method_from_path(path) - reader_method = path_guess["reader_method"] - reader_options = path_guess.get( - "reader_options" - ) # This may not be there; use None in that case - - try: - reader_fn = getattr(pd, reader_method) - if reader_options: - reader_fn = partial(reader_fn, **reader_options) - return reader_fn - except AttributeError: - raise BatchKwargsError( - f"Unable to find reader_method {reader_method} in pandas.", - {"reader_method": reader_method}, - ) diff --git a/great_expectations/datasource/sparkdf_datasource.py b/great_expectations/datasource/sparkdf_datasource.py deleted file mode 100644 index 6f46c46aa3dc..000000000000 --- a/great_expectations/datasource/sparkdf_datasource.py +++ /dev/null @@ -1,288 +0,0 @@ -import datetime -import logging -import uuid - -from great_expectations.compatibility import pyspark -from great_expectations.core.batch import Batch, BatchMarkers -from great_expectations.core.util import get_or_create_spark_application -from great_expectations.dataset import SparkDFDataset -from great_expectations.datasource.datasource import LegacyDatasource -from great_expectations.exceptions import BatchKwargsError -from great_expectations.types import ClassConfig -from great_expectations.types.configurations import classConfigSchema - -logger = logging.getLogger(__name__) - - -class SparkDFDatasource(LegacyDatasource): - """The SparkDFDatasource produces SparkDFDatasets and supports generators capable of interacting with local - filesystem (the default subdir_reader batch kwargs generator) and databricks notebooks. - - Accepted Batch Kwargs: - - PathBatchKwargs ("path" or "s3" keys) - - InMemoryBatchKwargs ("dataset" key) - - --ge-feature-maturity-info-- - - id: datasource_hdfs_spark - title: Datasource - HDFS - icon: - short_description: HDFS - description: Use HDFS as an external datasource in conjunction with Spark. - how_to_guide_url: - maturity: Experimental - maturity_details: - api_stability: Stable - implementation_completeness: Unknown - unit_test_coverage: Minimal (none) - integration_infrastructure_test_coverage: Minimal (none) - documentation_completeness: Minimal (none) - bug_risk: Unknown - - --ge-feature-maturity-info-- - """ - - recognized_batch_parameters = { - "reader_method", - "reader_options", - "limit", - "dataset_options", - } - - @classmethod - def build_configuration( # noqa: PLR0913 - cls, - data_asset_type=None, - batch_kwargs_generators=None, - spark_config=None, - force_reuse_spark_context=True, - persist=True, - **kwargs, - ): - """ - Build a full configuration object for a datasource, potentially including generators with defaults. - - Args: - data_asset_type: A ClassConfig dictionary - batch_kwargs_generators: Generator configuration dictionary - spark_config: dictionary of key-value pairs to pass to the spark builder - force_reuse_spark_context: If True then utilize existing SparkSession if it exists and is active - persist: Whether to persist the Spark Dataframe or not. - **kwargs: Additional kwargs to be part of the datasource constructor's initialization - - Returns: - A complete datasource configuration. - - """ - - if data_asset_type is None: - data_asset_type = { - "class_name": "SparkDFDataset", - "module_name": "great_expectations.dataset", - } - else: - data_asset_type = classConfigSchema.dump(ClassConfig(**data_asset_type)) - - if spark_config is None: - spark_config = {} - - configuration = kwargs - configuration.update( - { - "data_asset_type": data_asset_type, - "spark_config": spark_config, - "force_reuse_spark_context": force_reuse_spark_context, - "persist": persist, - } - ) - - if batch_kwargs_generators: - configuration["batch_kwargs_generators"] = batch_kwargs_generators - - return configuration - - def __init__( # noqa: PLR0913 - self, - name="default", - data_context=None, - data_asset_type=None, - batch_kwargs_generators=None, - spark_config=None, - force_reuse_spark_context=True, - persist=True, - **kwargs, - ) -> None: - """Build a new SparkDFDatasource instance. - - Args: - name: the name of this datasource - data_context: the DataContext to which this datasource is connected - data_asset_type: ClassConfig describing the data_asset type to be constructed by this datasource - batch_kwargs_generators: generator configuration - spark_config: dictionary of key-value pairs to be set on the spark session builder - force_reuse_spark_context: If True then utilize existing SparkSession if it exists and is active - persist: Whether to persist the Spark Dataframe or not. - **kwargs: Additional - """ - configuration_with_defaults = SparkDFDatasource.build_configuration( - data_asset_type, - batch_kwargs_generators, - spark_config, - force_reuse_spark_context, - persist, - **kwargs, - ) - data_asset_type = configuration_with_defaults.pop("data_asset_type") - batch_kwargs_generators = configuration_with_defaults.pop( - "batch_kwargs_generators", None - ) - super().__init__( - name, - data_context=data_context, - data_asset_type=data_asset_type, - batch_kwargs_generators=batch_kwargs_generators, - **configuration_with_defaults, - ) - - if spark_config is None: - spark_config = {} - spark = get_or_create_spark_application( - spark_config=spark_config, - force_reuse_spark_context=force_reuse_spark_context, - ) - self.spark = spark - - self._build_generators() - - def process_batch_parameters( - self, reader_method=None, reader_options=None, limit=None, dataset_options=None - ): - batch_kwargs = super().process_batch_parameters( - limit=limit, - dataset_options=dataset_options, - ) - - # Apply globally-configured reader options first - if reader_options: - # Then update with any locally-specified reader options - if not batch_kwargs.get("reader_options"): - batch_kwargs["reader_options"] = {} - batch_kwargs["reader_options"].update(reader_options) - - if reader_method is not None: - batch_kwargs["reader_method"] = reader_method - - return batch_kwargs - - def get_batch(self, batch_kwargs, batch_parameters=None): - """class-private implementation of get_data_asset""" - if self.spark is None: - logger.error("No spark session available") - return None - - reader_options = batch_kwargs.get("reader_options", {}) - - # We need to build batch_markers to be used with the DataFrame - batch_markers = BatchMarkers( - { - "ge_load_time": datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y%m%dT%H%M%S.%fZ" - ) - } - ) - - if "path" in batch_kwargs: - path = batch_kwargs["path"] - reader_method = batch_kwargs.get("reader_method") - reader = self.spark.read - - for option in reader_options.items(): - reader = reader.option(*option) - reader_fn = self._get_reader_fn(reader, reader_method, path) - df = reader_fn(path) - - elif "query" in batch_kwargs: - df = self.spark.sql(batch_kwargs["query"]) - - elif "dataset" in batch_kwargs and ( - ( - pyspark.DataFrame # type: ignore[truthy-function] - and isinstance(batch_kwargs["dataset"], pyspark.DataFrame) - ) - or isinstance(batch_kwargs["dataset"], SparkDFDataset) - ): - df = batch_kwargs.get("dataset") - # We don't want to store the actual dataframe in kwargs; copy the remaining batch_kwargs - batch_kwargs = {k: batch_kwargs[k] for k in batch_kwargs if k != "dataset"} - if isinstance(df, SparkDFDataset): - # Grab just the spark_df reference, since we want to override everything else - df = df.spark_df - # Record this in the kwargs *and* the id - batch_kwargs["SparkDFRef"] = True - batch_kwargs["ge_batch_id"] = str(uuid.uuid1()) - - else: - raise BatchKwargsError( - "Unrecognized batch_kwargs for spark_source", batch_kwargs - ) - - if "limit" in batch_kwargs: - df = df.limit(batch_kwargs["limit"]) - - return Batch( - datasource_name=self.name, - batch_kwargs=batch_kwargs, - data=df, - batch_parameters=batch_parameters, - batch_markers=batch_markers, - data_context=self._data_context, - ) - - @staticmethod - def guess_reader_method_from_path(path: str): - path = path.lower() - if path.endswith(".csv") or path.endswith(".tsv"): - return {"reader_method": "csv"} - elif ( - path.endswith(".parquet") or path.endswith(".parq") or path.endswith(".pqt") - ): - return {"reader_method": "parquet"} - - raise BatchKwargsError( - f"Unable to determine reader method from path: {path}", - {"path": path}, - ) - - def _get_reader_fn(self, reader, reader_method=None, path=None): - """Static helper for providing reader_fn - - Args: - reader: the base spark reader to use; this should have had reader_options applied already - reader_method: the name of the reader_method to use, if specified - path (str): the path to use to guess reader_method if it was not specified - - Returns: - ReaderMethod to use for the filepath - - """ - if reader_method is None and path is None: - raise BatchKwargsError( - "Unable to determine spark reader function without reader_method or path.", - {"reader_method": reader_method}, - ) - - if reader_method is None: - reader_method = self.guess_reader_method_from_path(path=path)[ - "reader_method" - ] - - try: - if reader_method.lower() in ["delta", "avro"]: - return reader.format(reader_method.lower()).load - - return getattr(reader, reader_method) - except AttributeError: - raise BatchKwargsError( - f"Unable to find reader_method {reader_method} in spark.", - {"reader_method": reader_method}, - ) diff --git a/tests/data_context/migrator/conftest.py b/tests/data_context/migrator/conftest.py index f15d8ed4be16..ac6561721945 100644 --- a/tests/data_context/migrator/conftest.py +++ b/tests/data_context/migrator/conftest.py @@ -1,4 +1,4 @@ -from typing import Dict, Final, List, Optional, Tuple, Union +from typing import Dict, Final, List, Optional, Tuple import pytest @@ -13,7 +13,7 @@ DataContextConfig, DatasourceConfig, ) -from great_expectations.datasource import BaseDatasource, LegacyDatasource +from great_expectations.datasource import BaseDatasource from great_expectations.rule_based_profiler import RuleBasedProfiler @@ -131,7 +131,7 @@ def _datasource_store(self): return StubDatasourceStore() @property - def datasources(self) -> Dict[str, Union[LegacyDatasource, BaseDatasource]]: + def datasources(self) -> Dict[str, BaseDatasource]: # Datasource is a dummy since we just want the DatasourceConfig from the store, not an # actual initialized datasource. return { diff --git a/tests/data_context/test_data_context.py b/tests/data_context/test_data_context.py index edbd519910d5..faa38aa023b1 100644 --- a/tests/data_context/test_data_context.py +++ b/tests/data_context/test_data_context.py @@ -42,7 +42,6 @@ from great_expectations.data_context.util import file_relative_path from great_expectations.datasource import ( Datasource, - LegacyDatasource, SimpleSqlalchemyDatasource, ) from great_expectations.datasource.types.batch_kwargs import PathBatchKwargs @@ -374,11 +373,6 @@ def test_data_context_get_latest_validation_result(titanic_data_context): assert latest_validation_result in validation_results -@pytest.mark.unit -def test_data_context_get_datasource(titanic_data_context): - isinstance(titanic_data_context.get_datasource("mydatasource"), LegacyDatasource) - - @pytest.mark.filesystem def test_data_context_expectation_suite_delete(empty_data_context): assert empty_data_context.add_expectation_suite( diff --git a/tests/data_context/test_pandas_datetime_suites.py b/tests/data_context/test_pandas_datetime_suites.py deleted file mode 100644 index 84b8a9154675..000000000000 --- a/tests/data_context/test_pandas_datetime_suites.py +++ /dev/null @@ -1,98 +0,0 @@ -import datetime -from tempfile import TemporaryDirectory - -import pandas as pd -import pytest - -from great_expectations.core.expectation_suite import ExpectationSuite -from great_expectations.dataset import PandasDataset - - -@pytest.mark.filesystem -def test_save_expectation_suite_with_datetime_objects( - data_context_parameterized_expectation_suite, -): - # create datetime evaluation parameters - evaluation_parameters = { - "now": datetime.datetime.now(), - "now_minus_48h": datetime.datetime.now() - datetime.timedelta(days=2), - } - test_data = { - "data_refresh": [ - datetime.datetime.now(), - datetime.datetime.now() - datetime.timedelta(days=1), - ] - } - test_df = pd.DataFrame(test_data) - dataset_name = "test_pandas_source" - - with TemporaryDirectory(): - context = data_context_parameterized_expectation_suite - ge_path = context.root_directory - - context.add_datasource(dataset_name, class_name="PandasDatasource") - - batch_kwargs = { - "dataset": test_df, - "datasource": dataset_name, - "PandasInMemoryDF": True, - "ge_batch_id": "test_id", - } - - empty_suite = context.add_expectation_suite("test_suite") - - batch = context._get_batch_v2( - batch_kwargs=batch_kwargs, expectation_suite_name=empty_suite - ) - for param in evaluation_parameters: - batch.set_evaluation_parameter(param, evaluation_parameters[param]) - - # Add expectation that will succeed using the datetime in a $PARAMETER - batch.expect_column_max_to_be_between( - column="data_refresh", min_value={"$PARAMETER": "now_minus_48h"} - ) - result = batch.validate() - assert result.success - batch.save_expectation_suite() - assert isinstance(batch, PandasDataset) - - # Check that we can load the saved expectation suite - reloaded_expectation_suite = context.get_expectation_suite("test_suite") - assert isinstance(reloaded_expectation_suite, ExpectationSuite) - - # Run validation via the action_list_operator - run_id = { - "run_name": f"{dataset_name}_{datetime.datetime.now()}", - "run_time": datetime.datetime.now(), - } - results = context.run_validation_operator( - "default", - assets_to_validate=[batch], - run_id=run_id, - evaluation_parameters=evaluation_parameters, - ) - assert results.success - - # Check that we can build Data Docs - index_page_locator_infos = context.build_data_docs() - assert ( - index_page_locator_infos["local_site"] - == f"file://{ge_path}/uncommitted/data_docs/local_site/index.html" - ) - - # Check that we can reload the expectation suite and validate - reloaded_batch = context._get_batch_v2( - batch_kwargs=batch_kwargs, expectation_suite_name=reloaded_expectation_suite - ) - - run_id = { - "run_name": f"reloaded_{dataset_name}_{datetime.datetime.now()}", - "run_time": datetime.datetime.now(), - } - reloaded_results = context.run_validation_operator( - "default", - assets_to_validate=[reloaded_batch], - run_id=run_id, - ) - - assert reloaded_results.success diff --git a/tests/datasource/batch_kwarg_generator/__init__.py b/tests/datasource/batch_kwarg_generator/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/tests/datasource/batch_kwarg_generator/test_subdir_reader_generator.py b/tests/datasource/batch_kwarg_generator/test_subdir_reader_generator.py deleted file mode 100644 index 58a2d1990a8f..000000000000 --- a/tests/datasource/batch_kwarg_generator/test_subdir_reader_generator.py +++ /dev/null @@ -1,188 +0,0 @@ -import os - -import pytest - -from great_expectations.datasource.batch_kwargs_generator import ( - SubdirReaderBatchKwargsGenerator, -) -from great_expectations.exceptions import BatchKwargsError - - -@pytest.mark.filesystem -def test_subdir_reader_path_partitioning(basic_pandas_datasource, tmp_path_factory): - base_directory = str( - tmp_path_factory.mktemp("test_subdir_reader_path_partitioning") - ) - mock_files = [ - "asset_1/20190101__asset_1.csv", - "asset_1/20190102__asset_1.csv", - "asset_1/20190103__asset_1.csv", - "asset_2/20190101__asset_2.csv", - "asset_2/20190102__asset_2.csv", - ] - for file in mock_files: - os.makedirs( # noqa: PTH103 - os.path.join(base_directory, file.split("/")[0]), # noqa: PTH118 - exist_ok=True, - ) - open(os.path.join(base_directory, file), "w").close() # noqa: PTH118 - - subdir_reader_generator = SubdirReaderBatchKwargsGenerator( - "test_generator", - datasource=basic_pandas_datasource, - base_directory=base_directory, - ) - - # We should see two assets - known_assets = subdir_reader_generator.get_available_data_asset_names()["names"] - # Use set in test to avoid order issues - assert set(known_assets) == {("asset_2", "directory"), ("asset_1", "directory")} - - # We should see three partitions for the first: - known_partitions = subdir_reader_generator.get_available_partition_ids( - data_asset_name="asset_1" - ) - assert set(known_partitions) == { - "20190101__asset_1", - "20190102__asset_1", - "20190103__asset_1", - } - - asset_1_kwargs = [ - kwargs - for kwargs in subdir_reader_generator.get_iterator(data_asset_name="asset_1") - ] - asset_2_kwargs = [ - kwargs - for kwargs in subdir_reader_generator.get_iterator(data_asset_name="asset_2") - ] - with pytest.raises(BatchKwargsError): - not_an_asset_kwargs = [ # noqa: F841 - kwargs - for kwargs in subdir_reader_generator.get_iterator( - data_asset_name="not_an_asset" - ) - ] - - assert len(asset_1_kwargs) == 3 - paths = [kwargs["path"] for kwargs in asset_1_kwargs] - assert set(paths) == { - os.path.join(base_directory, "asset_1/20190101__asset_1.csv"), # noqa: PTH118 - os.path.join(base_directory, "asset_1/20190102__asset_1.csv"), # noqa: PTH118 - os.path.join(base_directory, "asset_1/20190103__asset_1.csv"), # noqa: PTH118 - } - partitions = subdir_reader_generator.get_available_partition_ids( - data_asset_name="asset_1" - ) - - # SubdirReaderBatchKwargsGenerator uses filenames from subdirectories to generate partition names - assert set(partitions) == { - "20190101__asset_1", - "20190102__asset_1", - "20190103__asset_1", - } - assert len(asset_1_kwargs[0].keys()) == 2 - - assert len(asset_2_kwargs) == 2 - paths = [kwargs["path"] for kwargs in asset_2_kwargs] - assert set(paths) == { - os.path.join(base_directory, "asset_2/20190101__asset_2.csv"), # noqa: PTH118 - os.path.join(base_directory, "asset_2/20190102__asset_2.csv"), # noqa: PTH118 - } - partitions = subdir_reader_generator.get_available_partition_ids( - data_asset_name="asset_2" - ) - assert set(partitions) == {("20190101__asset_2"), ("20190102__asset_2")} - assert len(asset_2_kwargs[0].keys()) == 2 - - -@pytest.mark.filesystem -def test_subdir_reader_file_partitioning(basic_pandas_datasource, tmp_path_factory): - base_directory = str( - tmp_path_factory.mktemp("test_subdir_reader_file_partitioning") - ) - mock_files = [ - "20190101__asset_1.csv", - "20190102__asset_1.csv", - "20190103__asset_1.csv", - "asset_2/20190101__asset_2.csv", - "asset_2/20190102__asset_2.csv", - ] - for file in mock_files: - if "/" in file: - os.makedirs( # noqa: PTH103 - os.path.join(base_directory, file.split("/")[0]), # noqa: PTH118 - exist_ok=True, - ) - open(os.path.join(base_directory, file), "w").close() # noqa: PTH118 - - # If we have files, we should see them as individual assets - subdir_reader_generator = SubdirReaderBatchKwargsGenerator( - "test_generator", - datasource=basic_pandas_datasource, - base_directory=base_directory, - ) - - known_assets = subdir_reader_generator.get_available_data_asset_names()["names"] - assert set(known_assets) == { - ("20190101__asset_1", "file"), - ("20190102__asset_1", "file"), - ("20190103__asset_1", "file"), - ("asset_2", "directory"), - } - - # SubdirReaderBatchKwargsGenerator uses the filename as partition name for root files - known_partitions = subdir_reader_generator.get_available_partition_ids( - data_asset_name="20190101__asset_1" - ) - assert set(known_partitions) == {"20190101__asset_1"} - - kwargs = subdir_reader_generator.build_batch_kwargs( - data_asset_name="20190101__asset_1", partition_id="20190101__asset_1" - ) - assert kwargs["path"] == os.path.join( # noqa: PTH118 - base_directory, "20190101__asset_1.csv" - ) - - # We should also be able to pass a limit - kwargs = subdir_reader_generator.build_batch_kwargs( - data_asset_name="20190101__asset_1", partition_id="20190101__asset_1", limit=10 - ) - assert kwargs["path"] == os.path.join( # noqa: PTH118 - base_directory, "20190101__asset_1.csv" - ) - assert kwargs["reader_options"]["nrows"] == 10 - - -@pytest.mark.filesystem -def test_subdir_reader_configurable_reader_method( - basic_pandas_datasource, tmp_path_factory -): - base_directory = str( - tmp_path_factory.mktemp("test_subdir_reader_configurable_reader_method") - ) - mock_files = [ - "20190101__asset_1.dat", - "20190102__asset_1.dat", - "20190103__asset_1.dat", - "asset_2/20190101__asset_2.dat", - "asset_2/20190102__asset_2.dat", - ] - for file in mock_files: - if "/" in file: - os.makedirs( # noqa: PTH103 - os.path.join(base_directory, file.split("/")[0]), # noqa: PTH118 - exist_ok=True, - ) - open(os.path.join(base_directory, file), "w").close() # noqa: PTH118 - - # If we have files, we should see them as individual assets - subdir_reader_generator = SubdirReaderBatchKwargsGenerator( - "test_generator", - datasource=basic_pandas_datasource, - base_directory=base_directory, - reader_method="csv", - known_extensions=[".dat"], - ) - batch_kwargs = next(subdir_reader_generator.get_iterator(data_asset_name="asset_2")) - assert batch_kwargs["reader_method"] == "csv" diff --git a/tests/datasource/conftest.py b/tests/datasource/conftest.py index 0d717f8ea848..b61259b1b70d 100644 --- a/tests/datasource/conftest.py +++ b/tests/datasource/conftest.py @@ -10,8 +10,6 @@ from great_expectations.data_context.util import file_relative_path from great_expectations.datasource import ( Datasource, - PandasDatasource, - SparkDFDatasource, ) from great_expectations.execution_engine.sparkdf_execution_engine import ( SparkDFExecutionEngine, @@ -120,18 +118,6 @@ def glue_titanic_catalog(): yield client -@pytest.fixture(scope="module") -def basic_pandas_datasource(): - return PandasDatasource("basic_pandas_datasource") - - -@pytest.fixture(scope="module") -def basic_sparkdf_datasource(test_backends): - if "SparkDFDataset" not in test_backends: - pytest.skip("Spark has not been enabled, so this test must be skipped.") - return SparkDFDatasource("basic_sparkdf_datasource") - - @pytest.fixture def mysql_sqlalchemy_datasource(mysql_engine): return Datasource("mysql_sqlalchemy_datasource", engine=mysql_engine) diff --git a/tests/datasource/test_batch_generators.py b/tests/datasource/test_batch_generators.py deleted file mode 100644 index bd922b7b3830..000000000000 --- a/tests/datasource/test_batch_generators.py +++ /dev/null @@ -1,120 +0,0 @@ -import os - -import pytest - -from great_expectations.datasource.batch_kwargs_generator import ( - SubdirReaderBatchKwargsGenerator, -) - -try: - from unittest import mock -except ImportError: - from unittest import mock # noqa: F401 - - -@pytest.mark.big -def test_file_kwargs_generator( - data_context_parameterized_expectation_suite, filesystem_csv -): - base_dir = filesystem_csv - - datasource = data_context_parameterized_expectation_suite.add_datasource( - "default", - module_name="great_expectations.datasource", - class_name="PandasDatasource", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(base_dir), - } - }, - ) - - generator = datasource.get_batch_kwargs_generator("subdir_reader") - known_data_asset_names = datasource.get_available_data_asset_names() - - # Use set to avoid order dependency - assert set(known_data_asset_names["subdir_reader"]["names"]) == { - ("f1", "file"), - ("f2", "file"), - ("f3", "directory"), - } - - f1_batches = [ - batch_kwargs["path"] - for batch_kwargs in generator.get_iterator(data_asset_name="f1") - ] - assert len(f1_batches) == 1 - expected_batches = [{"path": os.path.join(base_dir, "f1.csv")}] # noqa: PTH118 - for batch in expected_batches: - assert batch["path"] in f1_batches - - f3_batches = [ - batch_kwargs["path"] - for batch_kwargs in generator.get_iterator(data_asset_name="f3") - ] - assert len(f3_batches) == 2 - expected_batches = [ - {"path": os.path.join(base_dir, "f3", "f3_20190101.csv")}, # noqa: PTH118 - {"path": os.path.join(base_dir, "f3", "f3_20190102.csv")}, # noqa: PTH118 - ] - for batch in expected_batches: - assert batch["path"] in f3_batches - - -@pytest.mark.big -def test_file_kwargs_generator_extensions(tmp_path_factory): - """csv, xls, parquet, json should be recognized file extensions""" - basedir = str(tmp_path_factory.mktemp("test_file_kwargs_generator_extensions")) - - # Do not include: invalid extension - with open(os.path.join(basedir, "f1.blarg"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - # Include - with open(os.path.join(basedir, "f2.csv"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - # Do not include: valid subdir, but no valid files in it - os.mkdir(os.path.join(basedir, "f3")) # noqa: PTH102, PTH118 - with open( - os.path.join(basedir, "f3", "f3_1.blarg"), "w" # noqa: PTH118 - ) as outfile: - outfile.write("\n\n\n") - with open( - os.path.join(basedir, "f3", "f3_2.blarg"), "w" # noqa: PTH118 - ) as outfile: - outfile.write("\n\n\n") - # Include: valid subdir with valid files - os.mkdir(os.path.join(basedir, "f4")) # noqa: PTH102, PTH118 - with open(os.path.join(basedir, "f4", "f4_1.csv"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - with open(os.path.join(basedir, "f4", "f4_2.csv"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - # Do not include: valid extension, but dot prefix - with open(os.path.join(basedir, ".f5.csv"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - - # Include: valid extensions - with open(os.path.join(basedir, "f6.tsv"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - with open(os.path.join(basedir, "f7.xls"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - with open(os.path.join(basedir, "f8.parquet"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - with open(os.path.join(basedir, "f9.xls"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - with open(os.path.join(basedir, "f0.json"), "w") as outfile: # noqa: PTH118 - outfile.write("\n\n\n") - - g1 = SubdirReaderBatchKwargsGenerator(datasource="foo", base_directory=basedir) - - g1_assets = g1.get_available_data_asset_names() - # Use set in test to avoid order issues - assert set(g1_assets["names"]) == { - ("f7", "file"), - ("f4", "directory"), - ("f6", "file"), - ("f0", "file"), - ("f2", "file"), - ("f9", "file"), - ("f8", "file"), - } diff --git a/tests/datasource/test_batch_kwargs.py b/tests/datasource/test_batch_kwargs.py deleted file mode 100644 index 191821c75bd5..000000000000 --- a/tests/datasource/test_batch_kwargs.py +++ /dev/null @@ -1,63 +0,0 @@ -import pytest - -from great_expectations.datasource.types import * # noqa: F403 - - -@pytest.mark.unit -def test_batch_kwargs_id(): - test_batch_kwargs = PathBatchKwargs({"path": "/data/test.csv"}) # noqa: F405 - # When there is only a single "important" key used in batch_kwargs, the ID can prominently include it - assert test_batch_kwargs.to_id() == "path=/data/test.csv" - - test_batch_kwargs = PathBatchKwargs( # noqa: F405 - { - "path": "/data/test.csv", - "reader_method": "read_csv", - "reader_options": { - "iterator": True, - "chunksize": 2e7, - "parse_dates": [0, 3], - "names": ["start", "type", "quantity", "end"], - }, - } - ) - # When there are multiple relevant keys we use the hash of the batch_kwargs dictionary - print(test_batch_kwargs.to_id()) - assert test_batch_kwargs.to_id() == "8607e071c6383509c8cd8f4c1ea65518" - - -@pytest.mark.unit -def test_batch_kwargs_attributes_and_keys(): - # When BatchKwargs are typed, the required keys should become accessible via dot notation and immutable - test_batch_kwargs = PathBatchKwargs( # noqa: F405 - { - "path": "/data/test.csv", - "reader_method": "read_csv", - "reader_options": { - "iterator": True, - "chunksize": 2e7, - "parse_dates": [0, 3], - "names": ["start", "type", "quantity", "end"], - }, - } - ) - assert test_batch_kwargs.path == "/data/test.csv" - assert test_batch_kwargs["path"] == test_batch_kwargs.path - - # We do not allow setting the special attributes this way - with pytest.raises(AttributeError): - test_batch_kwargs.path = "/a/new/path.csv" - - # Nor do we provide attribute-style access to unreserved names - with pytest.raises(AttributeError): - assert test_batch_kwargs.names == ["start", "type", "quantity", "end"] - - # But we can access and set even protected names using dictionary notation - assert test_batch_kwargs["reader_options"]["names"] == [ - "start", - "type", - "quantity", - "end", - ] - test_batch_kwargs["path"] = "/a/new/path.csv" - assert test_batch_kwargs.path == "/a/new/path.csv" diff --git a/tests/datasource/test_datasource.py b/tests/datasource/test_datasource.py deleted file mode 100644 index c22cedc51535..000000000000 --- a/tests/datasource/test_datasource.py +++ /dev/null @@ -1,11 +0,0 @@ -import pytest - -from great_expectations.datasource import LegacyDatasource - - -@pytest.mark.unit -def test_list_generators_returns_empty_list_if_no_generators_exist(): - datasource = LegacyDatasource(name="foo") - assert isinstance(datasource, LegacyDatasource) - obs = datasource.list_batch_kwargs_generators() - assert obs == [] diff --git a/tests/datasource/test_new_datasource_with_aws_glue_data_connector.py b/tests/datasource/test_new_datasource_with_aws_glue_data_connector.py index 0408e41ad197..d9cc6a40ca62 100644 --- a/tests/datasource/test_new_datasource_with_aws_glue_data_connector.py +++ b/tests/datasource/test_new_datasource_with_aws_glue_data_connector.py @@ -6,7 +6,7 @@ from great_expectations.compatibility import pyarrow, pyspark from great_expectations.core.yaml_handler import YAMLHandler from great_expectations.data_context.util import instantiate_class_from_config -from great_expectations.datasource import BaseDatasource, LegacyDatasource +from great_expectations.datasource import BaseDatasource pytestmark = pytest.mark.filesystem @@ -158,7 +158,7 @@ def test_instantiation_from_datasource( name="my_datasource", **data_source_config_with_aws_glue_catalog_data_connectors, ) - datasource: Union[LegacyDatasource, BaseDatasource, None] = context.get_datasource( + datasource: Union[BaseDatasource, None] = context.get_datasource( datasource_name="my_datasource" ) report: dict = datasource.self_check() diff --git a/tests/datasource/test_new_datasource_with_sql_data_connector.py b/tests/datasource/test_new_datasource_with_sql_data_connector.py index ae86c34d33cb..98e7232c36b4 100644 --- a/tests/datasource/test_new_datasource_with_sql_data_connector.py +++ b/tests/datasource/test_new_datasource_with_sql_data_connector.py @@ -21,7 +21,6 @@ ) from great_expectations.datasource import ( BaseDatasource, - LegacyDatasource, SimpleSqlalchemyDatasource, ) from great_expectations.exceptions.exceptions import ExecutionEngineError @@ -100,7 +99,7 @@ def data_context_with_sql_data_connectors_including_schema_for_testing_get_batch try: # noinspection PyUnusedLocal my_sql_datasource: Optional[ # noqa: F841 - Union[SimpleSqlalchemyDatasource, LegacyDatasource] + SimpleSqlalchemyDatasource ] = context.add_datasource( "test_sqlite_db_datasource", **yaml.load(datasource_config) ) @@ -230,7 +229,7 @@ def test_instantiation_with_ConfiguredAssetSqlDataConnector_round_trip_to_config n: 10 """ context.add_datasource(**yaml.load(config)) - datasource: Union[LegacyDatasource, BaseDatasource, None] = context.get_datasource( + datasource: Union[BaseDatasource, None] = context.get_datasource( datasource_name="my_datasource" ) report: dict = datasource.self_check() @@ -393,7 +392,7 @@ def test_instantiation_with_InferredAssetSqlDataConnector_round_trip_to_config_s n: 10 """ context.add_datasource(**yaml.load(config)) - datasource: Union[LegacyDatasource, BaseDatasource, None] = context.get_datasource( + datasource: Union[BaseDatasource, None] = context.get_datasource( datasource_name="my_datasource" ) report: dict = datasource.self_check() diff --git a/tests/datasource/test_pandas_datasource.py b/tests/datasource/test_pandas_datasource.py deleted file mode 100644 index 2fef1d3ede65..000000000000 --- a/tests/datasource/test_pandas_datasource.py +++ /dev/null @@ -1,427 +0,0 @@ -import os -import shutil -from functools import partial - -import pandas as pd -import pytest - -from great_expectations.core.batch import Batch, BatchMarkers -from great_expectations.core.expectation_suite import ExpectationSuite -from great_expectations.core.util import nested_update -from great_expectations.core.yaml_handler import YAMLHandler -from great_expectations.data_context.data_context.file_data_context import ( - FileDataContext, -) -from great_expectations.data_context.types.base import ( - DataContextConfigSchema, - DatasourceConfig, - datasourceConfigSchema, -) -from great_expectations.data_context.util import file_relative_path -from great_expectations.datasource import PandasDatasource -from great_expectations.datasource.datasource_serializer import ( - YAMLReadyDictDatasourceConfigSerializer, -) -from great_expectations.datasource.types import PathBatchKwargs -from great_expectations.exceptions import BatchKwargsError -from great_expectations.validator.validator import BridgeValidator - -yaml = YAMLHandler() - - -@pytest.mark.filesystem -def test_standalone_pandas_datasource(test_folder_connection_path_csv): - datasource = PandasDatasource( - "PandasCSV", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - - assert datasource.get_available_data_asset_names() == { - "subdir_reader": {"names": [("test", "file")], "is_complete_list": True} - } - manual_batch_kwargs = PathBatchKwargs( - path=os.path.join( # noqa: PTH118 - str(test_folder_connection_path_csv), "test.csv" - ) - ) - - generator = datasource.get_batch_kwargs_generator("subdir_reader") - auto_batch_kwargs = generator.yield_batch_kwargs("test") - - assert manual_batch_kwargs["path"] == auto_batch_kwargs["path"] - - # Include some extra kwargs... - # auto_batch_kwargs.update( - # {"reader_options": {"sep": ",", "header": 0, "index_col": 0}} - # ) - auto_batch_kwargs.update({"reader_options": {"sep": ","}}) - batch = datasource.get_batch(batch_kwargs=auto_batch_kwargs) - assert isinstance(batch, Batch) - dataset = batch.data - assert (dataset["col_1"] == [1, 2, 3, 4, 5]).all() - assert len(dataset) == 5 - - # A datasource should always return an object with a typed batch_id - assert isinstance(batch.batch_kwargs, PathBatchKwargs) - assert isinstance(batch.batch_markers, BatchMarkers) - - -@pytest.mark.filesystem -def test_create_pandas_datasource( - data_context_parameterized_expectation_suite, tmp_path_factory -): - basedir = tmp_path_factory.mktemp("test_create_pandas_datasource") - name = "test_pandas_datasource" - class_name = "PandasDatasource" - data_context_parameterized_expectation_suite.add_datasource( - name, - class_name=class_name, - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(basedir), - } - }, - ) - - data_context_config = data_context_parameterized_expectation_suite.get_config() - - assert name in data_context_config["datasources"] - assert data_context_config["datasources"][name]["class_name"] == class_name - # assert data_context_config["datasources"][name]["type"] == type_ - - # We should now see updated configs - # Finally, we should be able to confirm that the folder structure is as expected - with open( - os.path.join( # noqa: PTH118 - data_context_parameterized_expectation_suite.root_directory, - FileDataContext.GX_YML, - ), - ) as data_context_config_file: - data_context_file_config = yaml.load(data_context_config_file) - - # To match what we expect out of the yaml file, we need to deserialize our config using the same mechanism - serializer = YAMLReadyDictDatasourceConfigSerializer(schema=datasourceConfigSchema) - datasource_config: DatasourceConfig = DataContextConfigSchema().dump( - data_context_config - )["datasources"][name] - expected_serialized_datasource_config: dict = serializer.serialize( - datasource_config - ) - assert ( - data_context_file_config["datasources"][name] - == expected_serialized_datasource_config - ) - - # We should have added a default generator built from the default config - assert ( - data_context_file_config["datasources"][name]["batch_kwargs_generators"][ - "subdir_reader" - ]["class_name"] - == "SubdirReaderBatchKwargsGenerator" - ) - - -@pytest.mark.filesystem -def test_pandas_datasource_custom_data_asset( - data_context_parameterized_expectation_suite, test_folder_connection_path_csv -): - name = "test_pandas_datasource" - class_name = "PandasDatasource" - - data_asset_type_config = { - "module_name": "custom_pandas_dataset", - "class_name": "CustomPandasDataset", - } - data_context_parameterized_expectation_suite.add_datasource( - name, - class_name=class_name, - data_asset_type=data_asset_type_config, - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(test_folder_connection_path_csv), - } - }, - ) - - # We should now see updated configs - with open( - os.path.join( # noqa: PTH118 - data_context_parameterized_expectation_suite.root_directory, - FileDataContext.GX_YML, - ), - ) as data_context_config_file: - data_context_file_config = yaml.load(data_context_config_file) - - assert ( - data_context_file_config["datasources"][name]["data_asset_type"]["module_name"] - == "custom_pandas_dataset" - ) - assert ( - data_context_file_config["datasources"][name]["data_asset_type"]["class_name"] - == "CustomPandasDataset" - ) - - # We should be able to get a dataset of the correct type from the datasource. - data_context_parameterized_expectation_suite.add_expectation_suite( - expectation_suite_name="test" - ) - batch = data_context_parameterized_expectation_suite._get_batch_v2( - expectation_suite_name="test", - batch_kwargs=data_context_parameterized_expectation_suite.build_batch_kwargs( - datasource=name, - batch_kwargs_generator="subdir_reader", - data_asset_name="test", - ), - ) - assert type(batch).__name__ == "CustomPandasDataset" - res = batch.expect_column_values_to_have_odd_lengths("col_2") - assert res.success is True - - -@pytest.mark.filesystem -def test_pandas_source_read_csv( - data_context_parameterized_expectation_suite, tmp_path_factory -): - basedir = tmp_path_factory.mktemp("test_create_pandas_datasource") - shutil.copy(file_relative_path(__file__, "../test_sets/unicode.csv"), basedir) - data_context_parameterized_expectation_suite.add_datasource( - "mysource", - module_name="great_expectations.datasource", - class_name="PandasDatasource", - reader_options={"encoding": "utf-8"}, - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(basedir), - } - }, - ) - - data_context_parameterized_expectation_suite.add_expectation_suite( - expectation_suite_name="unicode" - ) - batch = data_context_parameterized_expectation_suite._get_batch_v2( - data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource", "subdir_reader", "unicode" - ), - "unicode", - ) - assert len(batch["Μ"] == 1) # noqa: RUF001 # greek mu - assert "😁" in list(batch["Μ"]) # noqa: RUF001 # greek mu - - data_context_parameterized_expectation_suite.add_datasource( - "mysource2", - module_name="great_expectations.datasource", - class_name="PandasDatasource", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(basedir), - } - }, - ) - - batch = data_context_parameterized_expectation_suite._get_batch_v2( - data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource2", "subdir_reader", "unicode" - ), - "unicode", - ) - assert "😁" in list(batch["Μ"]) # noqa: RUF001 # greek mu - - data_context_parameterized_expectation_suite.add_datasource( - "mysource3", - module_name="great_expectations.datasource", - class_name="PandasDatasource", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(basedir), - "reader_options": {"encoding": "utf-16"}, - } - }, - ) - - with pytest.raises(UnicodeError, match="UTF-16 stream does not start with BOM"): - batch = data_context_parameterized_expectation_suite._get_batch_v2( - data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource3", "subdir_reader", "unicode" - ), - "unicode", - ) - - with pytest.raises(LookupError, match="unknown encoding: blarg"): - batch_kwargs = data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource3", "subdir_reader", "unicode" - ) - batch_kwargs.update({"reader_options": {"encoding": "blarg"}}) - batch = data_context_parameterized_expectation_suite._get_batch_v2( - batch_kwargs=batch_kwargs, expectation_suite_name="unicode" - ) - - with pytest.raises(LookupError, match="unknown encoding: blarg"): - batch = data_context_parameterized_expectation_suite._get_batch_v2( - expectation_suite_name="unicode", - batch_kwargs=data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource", - "subdir_reader", - "unicode", - reader_options={"encoding": "blarg"}, - ), - ) - - batch = data_context_parameterized_expectation_suite._get_batch_v2( - batch_kwargs=data_context_parameterized_expectation_suite.build_batch_kwargs( - "mysource2", - "subdir_reader", - "unicode", - reader_options={"encoding": "utf-8"}, - ), - expectation_suite_name="unicode", - ) - assert "😁" in list(batch["Μ"]) # noqa: RUF001 # greek mu - - -@pytest.mark.filesystem -def test_invalid_reader_pandas_datasource(tmp_path_factory): - basepath = str(tmp_path_factory.mktemp("test_invalid_reader_pandas_datasource")) - datasource = PandasDatasource( - "mypandassource", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": basepath, - } - }, - ) - - with open( - os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "w", - ) as newfile: - newfile.write("a,b\n1,2\n3,4\n") - - with pytest.raises(BatchKwargsError) as exc: - datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ) - } - ) - assert "Unable to determine reader for path" in exc.value.message - - with pytest.raises(BatchKwargsError) as exc: - datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "reader_method": "blarg", - } - ) - assert "Unknown reader method: blarg" in exc.value.message - - batch = datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "reader_method": "read_csv", - "reader_options": {"header": 0}, - } - ) - assert batch.data["a"][0] == 1 - - -@pytest.mark.filesystem -def test_read_limit(test_folder_connection_path_csv): - datasource = PandasDatasource( - "PandasCSV", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - - batch_kwargs = PathBatchKwargs( - { - "path": os.path.join( # noqa: PTH118 - str(test_folder_connection_path_csv), "test.csv" - ), - # "reader_options": {"sep": ",", "header": 0, "index_col": 0}, - "reader_options": {"sep": ","}, - } - ) - nested_update(batch_kwargs, datasource.process_batch_parameters(limit=1)) - - batch = datasource.get_batch(batch_kwargs=batch_kwargs) - assert isinstance(batch, Batch) - dataset = batch.data - assert (dataset["col_1"] == [1]).all() - assert len(dataset) == 1 - - # A datasource should always return an object with a typed batch_id - assert isinstance(batch.batch_kwargs, PathBatchKwargs) - assert isinstance(batch.batch_markers, BatchMarkers) - - -@pytest.mark.unit -def test_process_batch_parameters(): - batch_kwargs = PandasDatasource("test").process_batch_parameters(limit=1) - assert batch_kwargs == {"reader_options": {"nrows": 1}} - - batch_kwargs = PandasDatasource("test").process_batch_parameters( - dataset_options={"caching": False} - ) - assert batch_kwargs == {"dataset_options": {"caching": False}} - - -@pytest.mark.filesystem -def test_pandas_datasource_processes_dataset_options( - test_folder_connection_path_csv, empty_data_context -): - context: DataContext = empty_data_context # noqa: F821 - datasource = PandasDatasource( - "PandasCSV", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - batch_kwargs = datasource.build_batch_kwargs( - "subdir_reader", data_asset_name="test" - ) - batch_kwargs["dataset_options"] = {"caching": False} - batch = datasource.get_batch(batch_kwargs) - validator = BridgeValidator( - batch, ExpectationSuite(expectation_suite_name="foo", data_context=context) - ) - dataset = validator.get_dataset() - assert dataset.caching is False - - -@pytest.mark.unit -@pytest.mark.parametrize( - "reader_fn", - [pd.read_csv, pd.read_excel, pd.read_parquet, pd.read_pickle, pd.read_sas], -) -def test_infer_default_options_partial_functions(reader_fn): - datasource = PandasDatasource() - reader_fn_partial = partial(reader_fn) - assert datasource._infer_default_options( - reader_fn_partial, {} - ) == datasource._infer_default_options(reader_fn, {}) diff --git a/tests/datasource/test_sparkdf_datasource.py b/tests/datasource/test_sparkdf_datasource.py deleted file mode 100644 index 5448d4c5f5d9..000000000000 --- a/tests/datasource/test_sparkdf_datasource.py +++ /dev/null @@ -1,447 +0,0 @@ -import os -import re - -import pandas as pd -import pytest - -from great_expectations import DataContext -from great_expectations.core.batch import Batch -from great_expectations.core.expectation_suite import ExpectationSuite -from great_expectations.core.yaml_handler import YAMLHandler -from great_expectations.data_context.data_context.file_data_context import ( - FileDataContext, -) -from great_expectations.datasource import SparkDFDatasource -from great_expectations.exceptions import BatchKwargsError -from great_expectations.util import is_library_loadable -from great_expectations.validator.validator import BridgeValidator - -yaml = YAMLHandler() - - -@pytest.mark.filesystem -@pytest.fixture(scope="module") -def test_parquet_folder_connection_path(tmp_path_factory): - pandas_version = re.match(r"(\d+)\.(\d+)\..+", pd.__version__) - if pandas_version is None: - raise ValueError("Unrecognized pandas version!") - else: - pandas_major_version = int(pandas_version.group(1)) - pandas_minor_version = int(pandas_version.group(2)) - if pandas_major_version == 0 and pandas_minor_version < 23: - pytest.skip("Pandas version < 23 is no longer compatible with pyarrow") - df1 = pd.DataFrame({"col_1": [1, 2, 3, 4, 5], "col_2": ["a", "b", "c", "d", "e"]}) - basepath = str(tmp_path_factory.mktemp("parquet_context")) - df1.to_parquet(os.path.join(basepath, "test.parquet")) # noqa: PTH118 - - return basepath - - -@pytest.mark.spark -def test_sparkdf_datasource_custom_data_asset( - data_context_parameterized_expectation_suite, - test_folder_connection_path_csv, - spark_session, -): - assert spark_session # Ensure a spark session exists - name = "test_sparkdf_datasource" - # type_ = "spark" - class_name = "SparkDFDatasource" - - data_asset_type_config = { - "module_name": "custom_sparkdf_dataset", - "class_name": "CustomSparkDFDataset", - } - data_context_parameterized_expectation_suite.add_datasource( - name, - class_name=class_name, - data_asset_type=data_asset_type_config, - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - - # We should now see updated configs - with open( - os.path.join( # noqa: PTH118 - data_context_parameterized_expectation_suite.root_directory, - FileDataContext.GX_YML, - ), - ) as data_context_config_file: - data_context_file_config = yaml.load(data_context_config_file) - - assert ( - data_context_file_config["datasources"][name]["data_asset_type"]["module_name"] - == "custom_sparkdf_dataset" - ) - assert ( - data_context_file_config["datasources"][name]["data_asset_type"]["class_name"] - == "CustomSparkDFDataset" - ) - - # We should be able to get a dataset of the correct type from the datasource. - data_context_parameterized_expectation_suite.add_expectation_suite( - "test_sparkdf_datasource.default" - ) - batch_kwargs = data_context_parameterized_expectation_suite.build_batch_kwargs( - name, "subdir_reader", "test" - ) - batch_kwargs["reader_options"] = {"header": True, "inferSchema": True} - batch = data_context_parameterized_expectation_suite._get_batch_v2( - batch_kwargs=batch_kwargs, - expectation_suite_name="test_sparkdf_datasource.default", - ) - assert type(batch).__name__ == "CustomSparkDFDataset" - res = batch.expect_column_approx_quantile_values_to_be_between( - "col_1", - quantile_ranges={"quantiles": [0.0, 1.0], "value_ranges": [[1, 1], [5, 5]]}, - ) - assert res.success is True - - -@pytest.mark.spark -def test_force_reuse_spark_context( - data_context_parameterized_expectation_suite, tmp_path_factory, test_backends -): - """ - Ensure that an external sparkSession can be reused by specifying the - force_reuse_spark_context argument. - """ - if "SparkDFDataset" not in test_backends: - pytest.skip("No spark backend selected.") - from pyspark.sql import SparkSession # isort:skip - - dataset_name = "test_spark_dataset" - - spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate() - data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]} - - spark_df = spark.createDataFrame(pd.DataFrame(data)) - tmp_parquet_filename = os.path.join( # noqa: PTH118 - tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name - ) - spark_df.write.format("parquet").save(tmp_parquet_filename) - - data_context_parameterized_expectation_suite.add_datasource( - dataset_name, - class_name="SparkDFDatasource", - force_reuse_spark_context=True, - module_name="great_expectations.datasource", - batch_kwargs_generators={}, - ) - - df = spark.read.format("parquet").load(tmp_parquet_filename) - batch_kwargs = {"dataset": df, "datasource": dataset_name} - _ = data_context_parameterized_expectation_suite.add_expectation_suite(dataset_name) - batch = data_context_parameterized_expectation_suite._get_batch_v2( - batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name - ) - results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100) - assert results.success, "Failed to use external SparkSession" - spark.stop() - - -@pytest.mark.spark -def test_spark_kwargs_are_passed_through( - data_context_parameterized_expectation_suite, - tmp_path_factory, - test_backends, - spark_session, -): - """ - Ensure that an external SparkSession is not stopped when the spark_config matches - the one specified in the GX Context. - """ - if "SparkDFDataset" not in test_backends: - pytest.skip("No spark backend selected.") - dataset_name = "test_spark_dataset" - - spark_config = dict(spark_session.sparkContext.getConf().getAll()) - data_context_parameterized_expectation_suite.add_datasource( - dataset_name, - class_name="SparkDFDatasource", - spark_config=spark_config, - force_reuse_spark_context=False, - persist=False, - module_name="great_expectations.datasource", - batch_kwargs_generators={}, - ) - datasource_config = data_context_parameterized_expectation_suite.get_datasource( - dataset_name - ).config - - actual_spark_config = datasource_config["spark_config"] - expected_spark_config = dict(spark_session.sparkContext.getConf().getAll()) - - # 20220714 - Chetan `spark.sql.warehouse.dir` intermittently shows up in Spark config - # As the rest of the config adheres to expectations, we conditionally pop and assert - # against known values in the payload. - for config in (actual_spark_config, expected_spark_config): - config.pop("spark.sql.warehouse.dir", None) - - assert datasource_config["spark_config"] == expected_spark_config - assert datasource_config["force_reuse_spark_context"] is False - assert datasource_config["persist"] is False - - dataset_name = "test_spark_dataset_2" - data_context_parameterized_expectation_suite.add_datasource( - dataset_name, - class_name="SparkDFDatasource", - spark_config={}, - force_reuse_spark_context=True, - persist=True, - module_name="great_expectations.datasource", - batch_kwargs_generators={}, - ) - datasource_config = data_context_parameterized_expectation_suite.get_datasource( - dataset_name - ).config - assert datasource_config["spark_config"] == {} - assert datasource_config["force_reuse_spark_context"] == True # noqa: E712 - assert datasource_config["persist"] is True - - -@pytest.mark.spark -def test_create_sparkdf_datasource( - data_context_parameterized_expectation_suite, tmp_path_factory, test_backends -): - if "SparkDFDataset" not in test_backends: - pytest.skip("Spark has not been enabled, so this test must be skipped.") - base_dir = tmp_path_factory.mktemp("test_create_sparkdf_datasource") - name = "test_sparkdf_datasource" - # type_ = "spark" - class_name = "SparkDFDatasource" - - data_context_parameterized_expectation_suite.add_datasource( - name, - class_name=class_name, - batch_kwargs_generators={ - "default": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": str(base_dir), - } - }, - ) - data_context_config = data_context_parameterized_expectation_suite.get_config() - - assert name in data_context_config["datasources"] - assert data_context_config["datasources"][name]["class_name"] == class_name - assert data_context_config["datasources"][name]["batch_kwargs_generators"][ - "default" - ]["base_directory"] == str(base_dir) - - base_dir = tmp_path_factory.mktemp("test_create_sparkdf_datasource-2") - name = "test_sparkdf_datasource" - - data_context_parameterized_expectation_suite.add_datasource( - name, - class_name=class_name, - batch_kwargs_generators={ - "default": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "reader_options": {"sep": "|", "header": False}, - } - }, - ) - data_context_config = data_context_parameterized_expectation_suite.get_config() - - assert name in data_context_config["datasources"] - assert data_context_config["datasources"][name]["class_name"] == class_name - assert ( - data_context_config["datasources"][name]["batch_kwargs_generators"]["default"][ - "reader_options" - ]["sep"] - == "|" - ) - - # Note that pipe is special in yml, so let's also check to see that it was properly serialized - with open( - os.path.join( # noqa: PTH118 - data_context_parameterized_expectation_suite.root_directory, - FileDataContext.GX_YML, - ), - ) as configfile: - lines = configfile.readlines() - assert " sep: '|'\n" in lines - assert " header: false\n" in lines - - -@pytest.mark.spark -@pytest.mark.skipif( - not is_library_loadable(library_name="pyarrow") - and not is_library_loadable(library_name="fastparquet"), - reason="pyarrow and fastparquet are not installed", -) -def test_standalone_spark_parquet_datasource( - test_parquet_folder_connection_path, spark_session -): - assert spark_session # Ensure a sparksession exists - datasource = SparkDFDatasource( - "SparkParquet", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_parquet_folder_connection_path, - } - }, - ) - - assert datasource.get_available_data_asset_names()["subdir_reader"]["names"] == [ - ("test", "file") - ] - batch = datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - test_parquet_folder_connection_path, "test.parquet" - ) - } - ) - assert isinstance(batch, Batch) - # NOTE: below is a great example of CSV vs. Parquet typing: pandas reads content as string, spark as int - assert batch.data.head()["col_1"] == 1 - assert batch.data.count() == 5 - - # Limit should also work - batch = datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - test_parquet_folder_connection_path, "test.parquet" - ), - "limit": 2, - } - ) - assert isinstance(batch, Batch) - # NOTE: below is a great example of CSV vs. Parquet typing: pandas reads content as string, spark as int - assert batch.data.head()["col_1"] == 1 - assert batch.data.count() == 2 - - -@pytest.mark.spark -def test_standalone_spark_csv_datasource( - test_folder_connection_path_csv, test_backends -): - if "SparkDFDataset" not in test_backends: - pytest.skip("Spark has not been enabled, so this test must be skipped.") - datasource = SparkDFDatasource( - "SparkParquet", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - - assert datasource.get_available_data_asset_names()["subdir_reader"]["names"] == [ - ("test", "file") - ] - batch = datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - test_folder_connection_path_csv, "test.csv" - ), - "reader_options": {"header": True}, - } - ) - assert isinstance(batch, Batch) - # NOTE: below is a great example of CSV vs. Parquet typing: pandas reads content as string, spark as int - assert batch.data.head()["col_1"] == "1" - - -@pytest.mark.spark -def test_invalid_reader_sparkdf_datasource(tmp_path_factory, test_backends): - if "SparkDFDataset" not in test_backends: - pytest.skip("Spark has not been enabled, so this test must be skipped.") - basepath = str(tmp_path_factory.mktemp("test_invalid_reader_sparkdf_datasource")) - datasource = SparkDFDatasource( - "mysparksource", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": basepath, - } - }, - ) - - with open( - os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "w", - ) as newfile: - newfile.write("a,b\n1,2\n3,4\n") - - with pytest.raises(BatchKwargsError) as exc: - datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ) - } - ) - assert "Unable to determine reader for path" in exc.value.message - - with pytest.raises(BatchKwargsError) as exc: - datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "reader_method": "blarg", - } - ) - assert "Unknown reader method: blarg" in exc.value.message - - with pytest.raises(BatchKwargsError) as exc: - datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "reader_method": "excel", - } - ) - assert "Unknown reader: excel" in exc.value.message - - batch = datasource.get_batch( - batch_kwargs={ - "path": os.path.join( # noqa: PTH118 - basepath, "idonotlooklikeacsvbutiam.notrecognized" - ), - "reader_method": "csv", - "reader_options": {"header": True}, - } - ) - assert batch.data.head()["a"] == "1" - - -@pytest.mark.spark -def test_spark_datasource_processes_dataset_options( - test_folder_connection_path_csv, test_backends, empty_data_context -): - context: DataContext = empty_data_context - if "SparkDFDataset" not in test_backends: - pytest.skip("Spark has not been enabled, so this test must be skipped.") - datasource = SparkDFDatasource( - "PandasCSV", - batch_kwargs_generators={ - "subdir_reader": { - "class_name": "SubdirReaderBatchKwargsGenerator", - "base_directory": test_folder_connection_path_csv, - } - }, - ) - batch_kwargs = datasource.build_batch_kwargs( - "subdir_reader", data_asset_name="test" - ) - batch_kwargs["dataset_options"] = {"caching": False, "persist": False} - batch = datasource.get_batch(batch_kwargs) - validator = BridgeValidator( - batch, ExpectationSuite(expectation_suite_name="foo", data_context=context) - ) - dataset = validator.get_dataset() - assert dataset.caching is False - assert dataset._persist is False