diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d2033b2d89c8..1f1d4dd480a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -433,6 +433,7 @@ jobs: - postgresql - snowflake - spark + - spark_connect - trino python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] exclude: diff --git a/assets/docker/spark/docker-compose.yml b/assets/docker/spark/docker-compose.yml index d827ce8c7626..550eeb0caee0 100644 --- a/assets/docker/spark/docker-compose.yml +++ b/assets/docker/spark/docker-compose.yml @@ -4,3 +4,10 @@ services: ports: - "9090:8080" - "7077:7077" + + spark-connect: + image: ${ECR_PULL_THROUGH_REPOSITORY_URL}bitnami/spark:3.5.2 + ports: + - "15002:15002" + # See https://spark.apache.org/docs/latest/spark-connect-overview.html#download-and-start-spark-server-with-spark-connect + command: ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.2 diff --git a/docs/docusaurus/docs/cloud/connect/connect_databrickssql.md b/docs/docusaurus/docs/cloud/connect/connect_databrickssql.md new file mode 100644 index 000000000000..5a00a0e333c0 --- /dev/null +++ b/docs/docusaurus/docs/cloud/connect/connect_databrickssql.md @@ -0,0 +1,42 @@ +--- +sidebar_label: 'Connect GX Cloud to Databricks SQL' +title: 'Connect GX Cloud to Databricks SQL' +description: Connect GX Cloud to a Databricks SQL Data Source. +--- + +import TabItem from '@theme/TabItem'; +import Tabs from '@theme/Tabs'; + +## Prerequisites + +- You have a [GX Cloud account](https://greatexpectations.io/cloud) with [Admin or Editor permissions](../about_gx.md#roles-and-responsibilities). + +- You have a Databricks SQL catalog, schema, and table. + +- To improve data security, GX recommends creating a separate Databricks SQL [service principal](https://docs.databricks.com/en/admin/users-groups/service-principals.html#manage-service-principals-in-your-account) for your GX Cloud connection. + + +## Connect to a Databricks SQL Data Asset + +1. In GX Cloud, click **Data Assets** > **New Data Asset** > **Databricks SQL**. + +2. Enter a meaningful name for the Data Source in the **Data Source name** field. + +3. Enter a connection string in the **Connection string** field. The connection string format is `databricks://token:{token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}`. + - Check the instructions to create a GX-specific user in your Databricks SQL catalog by clicking "See instructions" + +4. Click **Connect**. + +5. Select tables to import as Data Assets: + + - Check the box next to a table name to add that table as an asset. + + - At least one table must be added. + + - To search for a specific table type the table's name in the Search box above the list of tables. + + - To add all of the available tables check the box for All Tables. + +6. Click **Add Asset**. + +7. Create an Expectation. See [Create an Expectation](/cloud/expectations/manage_expectations.md#create-an-expectation). diff --git a/docs/docusaurus/docs/cloud/connect/connect_lp.md b/docs/docusaurus/docs/cloud/connect/connect_lp.md index 1985515cc951..9a544fdc42a4 100644 --- a/docs/docusaurus/docs/cloud/connect/connect_lp.md +++ b/docs/docusaurus/docs/cloud/connect/connect_lp.md @@ -17,6 +17,7 @@ import OverviewCard from '@site/src/components/OverviewCard'; + \ No newline at end of file diff --git a/docs/docusaurus/docs/cloud/expectations/manage_expectations.md b/docs/docusaurus/docs/cloud/expectations/manage_expectations.md index af655684c2c3..c6b2c68901d5 100644 --- a/docs/docusaurus/docs/cloud/expectations/manage_expectations.md +++ b/docs/docusaurus/docs/cloud/expectations/manage_expectations.md @@ -8,12 +8,6 @@ An Expectation is a verifiable assertion about your data. They make implicit ass -:::info Custom SQL Query Expectations - -To create custom SQL query Expectations, you'll need to use the GX API. See [Customize Expectations](/core/customize_expectations/customize_expectations.md). - -::: - ## Prerequisites - You have a [Data Asset](/cloud/data_assets/manage_data_assets.md#create-a-data-asset). @@ -23,7 +17,7 @@ To create custom SQL query Expectations, you'll need to use the GX API. See [Cus The following table lists the available GX Cloud Expectations. | Data Quality Issue | Expectation | Description | -| ------------------ | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | +|--------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------| | Cardinality | `expect_column_values_to_be_unique` | Expect each column value to be unique. | | Cardinality | `expect_compound_columns_to_be_unique` | Expect the compound columns to be unique. | | Cardinality | `expect_select_column_values_to_be_unique_within_record` | Expect the values for each record to be unique across the columns listed. Note that records can be duplicated. | @@ -69,6 +63,18 @@ The following table lists the available GX Cloud Expectations. | Volume | `expect_table_row_count_to_equal` | Expect the number of rows to equal a value. | | Volume | `expect_table_row_count_to_equal_other_table` | Expect the number of rows to equal the number in another table within the same database. | +## Custom SQL Expectations + +GX Cloud also offers the ability to write a custom Expectation using SQL. It is designed to fail validation if the provided SQL query returns one or more rows. + +The provided query should be written in the dialect of the Data Source in which a given Data Asset lives. + +:::info Optional `{batch}` named query + +The optional `{batch}` named query references the Batch of data under test. When the Expectation is evaluated, the `{batch}` named query will be replaced with the Batch of data that is validated. + +::: + ## Add an Expectation 1. In GX Cloud, click **Data Assets**. diff --git a/docs/docusaurus/docs/components/_data.jsx b/docs/docusaurus/docs/components/_data.jsx index 5950a4a31a4d..7e9f9d9dfe62 100644 --- a/docs/docusaurus/docs/components/_data.jsx +++ b/docs/docusaurus/docs/components/_data.jsx @@ -1,5 +1,5 @@ export default { - release_version: 'great_expectations, version 1.0.3', + release_version: 'great_expectations, version 1.0.5', min_python: '3.8', max_python: '3.11' } diff --git a/docs/docusaurus/docs/core/customize_expectations/use_sql_to_define_a_custom_expectation.md b/docs/docusaurus/docs/core/customize_expectations/use_sql_to_define_a_custom_expectation.md index 8917bf4d6b6e..91028741bb14 100644 --- a/docs/docusaurus/docs/core/customize_expectations/use_sql_to_define_a_custom_expectation.md +++ b/docs/docusaurus/docs/core/customize_expectations/use_sql_to_define_a_custom_expectation.md @@ -11,7 +11,7 @@ import PrereqPreconfiguredDataSourceAndAsset from '../_core_components/prerequis Among the available Expectations, the `UnexpectedRowsExpectation` is designed to facilitate the execution of SQL or Spark-SQL queries as the core logic for an Expectation. By default, `UnexpectedRowsExpectation` considers validation successful when no rows are returned by the provided SQL query. -You customize an `UnexpectedRowsExpectation` in essentially the same manner as you would [define a custom Expectation](/core/customize_expectations/define_a_custom_expectation_class.md), by subclassing `UnexpectedRowsExpectation` and providing customized default attributes and text for Data Docs. However, there are some caveats around the `UnexpectedRowsExpectation`'s `unexpected_rows_query` attribute that deserve further detail. +Like any other Expectation, you can instantiate the `UnexpectedRowsExpectation` directly. You can also customize an `UnexpectedRowsExpectation` in essentially the same manner as you would [define a custom Expectation](/core/customize_expectations/define_a_custom_expectation_class.md), by subclassing `UnexpectedRowsExpectation` and providing customized default attributes and text for Data Docs. However, there are some caveats around the `UnexpectedRowsExpectation`'s `unexpected_rows_query` attribute that deserve further detail. @@ -48,7 +48,7 @@ You customize an `UnexpectedRowsExpectation` in essentially the same manner as y The `unexpected_rows_query` attribute is a SQL or Spark-SQL query that returns a selection of rows from the Batch of data being validated. By default, rows that are returned have failed the validation check. - Although the `unexpected_rows_query` should be written in standard SQL or Spark-SQL syntax, it must also contain the special `{batch}` placeholder. When the Expectation is evaluated, the `{batch}` placeholder will be replaced with the Batch of data that is validated. + The `unexpected_rows_query` should be written in standard SQL or Spark-SQL syntax, except that it can also contain the special `{batch}` named query. When the Expectation is evaluated, the `{batch}` keyword will be replaced with the Batch of data that is configured for your Data Asset. In this example, `unexpected_rows_query` will select any rows where the passenger count is greater than `6` or less than `0`. These rows will fail validation for this Expectation: diff --git a/docs/docusaurus/docs/oss/changelog.md b/docs/docusaurus/docs/oss/changelog.md index faa7fbe3d7ec..91eed0fe9c04 100644 --- a/docs/docusaurus/docs/oss/changelog.md +++ b/docs/docusaurus/docs/oss/changelog.md @@ -14,6 +14,24 @@ When we deprecate our public APIs, we will Before we completely remove the functionality in a new major release, there will be at least one minor release that contains the deprecation so that you can smoothly transition to the new API. +### 1.0.5 +* [BUGFIX] Using `{batch}` keyword in `UnexpectedRowsQuery` ([#10392](https://github.com/great-expectations/great_expectations/pull/10392)) +* [BUGFIX] Fix Databricks SQL Regex and Like based Expectations ([#10406](https://github.com/great-expectations/great_expectations/pull/10406)) +* [BUGFIX] Support Spark connect dataframes ([#10420](https://github.com/great-expectations/great_expectations/pull/10420)) +* [BUGFIX] Handle DatabricksSQL attribute error and update dependency ([#10424](https://github.com/great-expectations/great_expectations/pull/10424)) +* [DOCS] Add Connect to Databricks SQL page in GX Cloud ([#10394](https://github.com/great-expectations/great_expectations/pull/10394)) (thanks @allisongx) +* [DOCS] Changelog updates `0.18.18` -> `0.18.21` ([#10422](https://github.com/great-expectations/great_expectations/pull/10422)) +* [DOCS] Add Connect to Databricks SQL to GX Cloud docs TOC ([#10423](https://github.com/great-expectations/great_expectations/pull/10423)) +* [MAINTENANCE] Fix `SQLAlchemyExectionEngine.get_connection()` typing + update column identifier tests ([#10399](https://github.com/great-expectations/great_expectations/pull/10399)) +* [MAINTENANCE] Move FabricPowerBIDatasource out of experimental dir ([#10419](https://github.com/great-expectations/great_expectations/pull/10419)) + +### 1.0.4 +* [BUGFIX] Fix action equality ([#10393](https://github.com/great-expectations/great_expectations/pull/10393)) +* [BUGFIX] Patch additional issues with data docs page retrieval in checkpoint actions ([#10400](https://github.com/great-expectations/great_expectations/pull/10400)) +* [DOCS] Add CTAs to request a demo ([#10389](https://github.com/great-expectations/great_expectations/pull/10389)) +* [MAINTENANCE] Ensure that all nested validation definition diagnostics are emitted from a parent checkpoint ([#10386](https://github.com/great-expectations/great_expectations/pull/10386)) +* [MAINTENANCE] Fix `SQLAlchemyExectionEngine.get_connection()` typing + update column identifier tests ([#10399](https://github.com/great-expectations/great_expectations/pull/10399)) + ### 1.0.3 * [FEATURE] Replace get_batch_list_from_batch_request with get_batch and get_batch_identifiers_list ([#10295](https://github.com/great-expectations/great_expectations/pull/10295)) * [FEATURE] Add Checkpoint.run analytics ([#10382](https://github.com/great-expectations/great_expectations/pull/10382)) diff --git a/docs/docusaurus/docusaurus.config.js b/docs/docusaurus/docusaurus.config.js index c9a366692afc..28901de84138 100644 --- a/docs/docusaurus/docusaurus.config.js +++ b/docs/docusaurus/docusaurus.config.js @@ -298,10 +298,10 @@ module.exports = { lastVersion: 'current', versions: { current: { - label: '1.0.3', + label: '1.0.5', }, ['0.18']: { - label: '0.18.17', + label: '0.18.21', }, }, admonitions: { @@ -325,6 +325,12 @@ module.exports = { // Optional fields. anonymizeIP: true, // Should IPs be anonymized? }, + sitemap: { + ignorePatterns: [ + '**/0.18/oss/templates/**', + '**/0.18/oss/team_templates/**' + ], + } }, ], ], diff --git a/docs/docusaurus/sidebars.js b/docs/docusaurus/sidebars.js index 8911e3abbd01..1f6a1a709386 100644 --- a/docs/docusaurus/sidebars.js +++ b/docs/docusaurus/sidebars.js @@ -190,6 +190,7 @@ module.exports = { items: [ 'cloud/connect/connect_postgresql', 'cloud/connect/connect_snowflake', + 'cloud/connect/connect_databrickssql', 'cloud/connect/connect_airflow', 'cloud/connect/connect_python', ] @@ -251,6 +252,11 @@ module.exports = { label: 'Available Expectations', href: '/docs/cloud/expectations/manage_expectations#available-expectations', }, + { + type: 'link', + label: 'Custom SQL Expectations', + href: '/docs/cloud/expectations/manage_expectations#custom-sql-expectations', + }, { type: 'link', label: 'Add an Expectation', diff --git a/docs/docusaurus/versioned_docs/version-0.18/oss/changelog.md b/docs/docusaurus/versioned_docs/version-0.18/oss/changelog.md index 9db9f0530353..cde0dd568589 100644 --- a/docs/docusaurus/versioned_docs/version-0.18/oss/changelog.md +++ b/docs/docusaurus/versioned_docs/version-0.18/oss/changelog.md @@ -10,6 +10,31 @@ title: Changelog - Deprecation warnings are accompanied by a moniker (as a code comment) indicating when they were deprecated. For example: `# deprecated-v0.13` - Changes to methods and parameters due to deprecation are also noted in the relevant docstrings. +### 0.18.21 +* [BUGFIX] Using `{batch}` keyword in `UnexpectedRowsQuery` (#10392) ([#10411](https://github.com/great-expectations/great_expectations/pull/10411)) +* [BUGFIX] 0.18.x Ignore unsupported INTERVAL type as part of CDM ([#10414](https://github.com/great-expectations/great_expectations/pull/10414)) +* [BUGFIX] 0.18.x Databricks SQL Pattern Expectation Fix ([#10415](https://github.com/great-expectations/great_expectations/pull/10415)) +* [MAINTENANCE] Pass `description` from `Validator` to `ExpectationConfiguration` ([#10388](https://github.com/great-expectations/great_expectations/pull/10388)) + +### 0.18.20 +* [FEATURE] Add `UnexpectedRowsExpectation` ([#10342](https://github.com/great-expectations/great_expectations/pull/10342)) +* [BUGFIX] Remove illegible duplicate local Data Docs link from Slack renderer ([#10129](https://github.com/great-expectations/great_expectations/pull/10129)) +* [MAINTENANCE] Ruff 0.5.3 + PR annotations ([#10128](https://github.com/great-expectations/great_expectations/pull/10128)) +* [MAINTENANCE] Fix 0.18.x CI ([#10199](https://github.com/great-expectations/great_expectations/pull/10199)) +* [MAINTENANCE] Update column identifier tests ([#8783](https://github.com/great-expectations/great_expectations/pull/8783)) + +### 0.18.19 +* [FEATURE] Snowflake test for the presence of a schema in `test_connection()` ([#10100](https://github.com/great-expectations/great_expectations/pull/10100)) +* [BUGFIX] Z-score renderer when `double_sided` ([#10085](https://github.com/great-expectations/great_expectations/pull/10085)) +* [BUGFIX] SQLDatasource - lowercase unquoted `schema_names` for SQLAlchemy case-sensitivity compatibility ([#10107](https://github.com/great-expectations/great_expectations/pull/10107)) +* [MAINTENANCE] Export `great_expectations.compatibility` types ([#10089](https://github.com/great-expectations/great_expectations/pull/10089)) +* [MAINTENANCE] 0.18.x - mypy - `possibly-undefined` ([#10091](https://github.com/great-expectations/great_expectations/pull/10091)) +* [MAINTENANCE] loosen ruamel pin ([#10081](https://github.com/great-expectations/great_expectations/pull/10081)) + +### 0.18.18 +* [FEATURE] Add atomic renderer for `ExpectMulticolumnSumToEqual` (#10076) ([#10077](https://github.com/great-expectations/great_expectations/pull/10077)) +* [FEATURE] Snowflake - narrow Account Identifier regex ([#10069](https://github.com/great-expectations/great_expectations/pull/10069)) +* [FEATURE] Add missing atomic renderers to Expectations (#10079) ([#10080](https://github.com/great-expectations/great_expectations/pull/10080)) ### 0.18.17 * [FEATURE] Snowflake - Better Account Identifier related TestConnectionErrors ([#10043](https://github.com/great-expectations/great_expectations/pull/10043)) diff --git a/great_expectations/compatibility/databricks.py b/great_expectations/compatibility/databricks.py index e78e5701d500..ad0d5072bf5a 100644 --- a/great_expectations/compatibility/databricks.py +++ b/great_expectations/compatibility/databricks.py @@ -5,6 +5,6 @@ ) try: - from databricks import connect # type: ignore[import-untyped] + from databricks import connect except ImportError: connect = DATABRICKS_CONNECT_NOT_IMPORTED diff --git a/great_expectations/compatibility/pyspark.py b/great_expectations/compatibility/pyspark.py index 1551b90ae65f..0d25017897dc 100644 --- a/great_expectations/compatibility/pyspark.py +++ b/great_expectations/compatibility/pyspark.py @@ -39,6 +39,11 @@ except (ImportError, AttributeError): Column = SPARK_NOT_IMPORTED # type: ignore[assignment,misc] +try: + from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame +except (ImportError, AttributeError): + ConnectDataFrame = SPARK_NOT_IMPORTED # type: ignore[assignment,misc] + try: from pyspark.sql import DataFrame except (ImportError, AttributeError): diff --git a/great_expectations/datasource/fluent/__init__.py b/great_expectations/datasource/fluent/__init__.py index 4052427ef418..17dc94b98129 100644 --- a/great_expectations/datasource/fluent/__init__.py +++ b/great_expectations/datasource/fluent/__init__.py @@ -50,7 +50,7 @@ from great_expectations.datasource.fluent.pandas_azure_blob_storage_datasource import ( PandasAzureBlobStorageDatasource, ) -from great_expectations.experimental.datasource.fabric import FabricPowerBIDatasource +from great_expectations.datasource.fluent.fabric import FabricPowerBIDatasource from great_expectations.datasource.fluent.postgres_datasource import ( PostgresDatasource, ) diff --git a/great_expectations/experimental/datasource/fabric.py b/great_expectations/datasource/fluent/fabric.py similarity index 100% rename from great_expectations/experimental/datasource/fabric.py rename to great_expectations/datasource/fluent/fabric.py diff --git a/great_expectations/datasource/fluent/spark_datasource.py b/great_expectations/datasource/fluent/spark_datasource.py index 169ffdfe650d..9c3ad18ef0d2 100644 --- a/great_expectations/datasource/fluent/spark_datasource.py +++ b/great_expectations/datasource/fluent/spark_datasource.py @@ -5,6 +5,7 @@ from pprint import pformat as pf from typing import ( TYPE_CHECKING, + Any, ClassVar, Dict, Generic, @@ -27,7 +28,7 @@ StrictInt, StrictStr, ) -from great_expectations.compatibility.pyspark import DataFrame, pyspark +from great_expectations.compatibility.pyspark import ConnectDataFrame, DataFrame, pyspark from great_expectations.compatibility.typing_extensions import override from great_expectations.core import IDDict from great_expectations.core.batch import LegacyBatchDefinition @@ -47,7 +48,7 @@ from great_expectations.exceptions.exceptions import BuildBatchRequestError if TYPE_CHECKING: - from typing_extensions import TypeAlias + from typing_extensions import TypeAlias, TypeGuard from great_expectations.compatibility.pyspark import SparkSession from great_expectations.core.batch_definition import BatchDefinition @@ -231,9 +232,9 @@ def build_batch_request( if not (options is not None and "dataframe" in options and len(options) == 1): raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.") - if not isinstance(options["dataframe"], DataFrame): + if not self.is_spark_data_frame(options["dataframe"]): raise BuildBatchRequestError( - message="Can not build batch request for dataframe asset " "without a dataframe." + message="Cannot build batch request without a Spark DataFrame." ) return BatchRequest( @@ -255,7 +256,7 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None: and batch_request.options and len(batch_request.options) == 1 and "dataframe" in batch_request.options - and isinstance(batch_request.options["dataframe"], DataFrame) + and self.is_spark_data_frame(batch_request.options["dataframe"]) ): expect_batch_request_form = BatchRequest[None]( datasource_name=self.datasource.name, @@ -314,6 +315,14 @@ def add_batch_definition_whole_dataframe(self, name: str) -> BatchDefinition: partitioner=None, ) + @staticmethod + def is_spark_data_frame(df: Any) -> TypeGuard[Union[DataFrame, ConnectDataFrame]]: + """Check that a given object is a Spark DataFrame. + This could either be a regular Spark DataFrame or a Spark Connect DataFrame. + """ + data_frame_types = [DataFrame, ConnectDataFrame] + return any((cls and isinstance(df, cls)) for cls in data_frame_types) + @public_api class SparkDatasource(_SparkDatasource): diff --git a/great_expectations/deployment_version b/great_expectations/deployment_version index 21e8796a09d4..90a27f9cea6e 100644 --- a/great_expectations/deployment_version +++ b/great_expectations/deployment_version @@ -1 +1 @@ -1.0.3 +1.0.5 diff --git a/great_expectations/execution_engine/sqlalchemy_execution_engine.py b/great_expectations/execution_engine/sqlalchemy_execution_engine.py index 736a847d3116..f39176d69f6c 100644 --- a/great_expectations/execution_engine/sqlalchemy_execution_engine.py +++ b/great_expectations/execution_engine/sqlalchemy_execution_engine.py @@ -378,6 +378,8 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915 self.dialect_module = import_library_module( module_name="clickhouse_sqlalchemy.drivers.base" ) + elif self.dialect_name == GXSqlDialect.DATABRICKS: + self.dialect_module = import_library_module("databricks.sqlalchemy") else: self.dialect_module = None diff --git a/great_expectations/expectations/metrics/query_metric_provider.py b/great_expectations/expectations/metrics/query_metric_provider.py index 396bb1b2eb31..9e35164e5ae0 100644 --- a/great_expectations/expectations/metrics/query_metric_provider.py +++ b/great_expectations/expectations/metrics/query_metric_provider.py @@ -1,13 +1,27 @@ from __future__ import annotations import logging -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar +from great_expectations.execution_engine.sqlalchemy_dialect import GXSqlDialect from great_expectations.expectations.metrics.metric_provider import MetricProvider +if TYPE_CHECKING: + from great_expectations.compatibility.sqlalchemy import ( + sqlalchemy as sa, + ) + logger = logging.getLogger(__name__) +class MissingElementError(TypeError): + def __init__(self): + super().__init__( + "The batch_subquery selectable does not contain an " + "element from which query parameters can be extracted." + ) + + class QueryMetricProvider(MetricProvider): """Base class for all Query Metrics, which define metrics to construct SQL queries. @@ -32,6 +46,10 @@ class QueryMetricProvider(MetricProvider): query_param_name: ClassVar[str] = "query" + dialect_columns_require_subquery_aliases: ClassVar[set[GXSqlDialect]] = { + GXSqlDialect.POSTGRESQL + } + @classmethod def _get_query_from_metric_value_kwargs(cls, metric_value_kwargs: dict) -> str: query_param = cls.query_param_name @@ -42,3 +60,40 @@ def _get_query_from_metric_value_kwargs(cls, metric_value_kwargs: dict) -> str: raise ValueError(f"Must provide `{query_param}` to `{cls.__name__}` metric.") # noqa: TRY003 return query + + @classmethod + def _get_query_string_with_substituted_batch_parameters( + cls, query: str, batch_subquery: sa.sql.Subquery | sa.sql.Alias + ) -> str: + """Specifying a runtime query string returns the active batch as a Subquery or Alias type + There is no object-based way to apply the subquery alias to columns in the SELECT and + WHERE clauses. Instead, we extract the subquery parameters from the batch selectable + and inject them into the SQL string. + + Raises: + MissingElementError if the batch_subquery.selectable does not have an element + for which to extract query parameters. + """ + + try: + froms = batch_subquery.selectable.element.get_final_froms() # type: ignore[attr-defined] # possible AttributeError handled + try: + batch_table = froms[0].name + except AttributeError: + batch_table = str(froms[0]) + batch_filter = str(batch_subquery.selectable.element.whereclause) # type: ignore[attr-defined] # possible AttributeError handled + except (AttributeError, IndexError) as e: + raise MissingElementError() from e + + unfiltered_query = query.format(batch=batch_table) + + if "WHERE" in query.upper(): + query = unfiltered_query.replace("WHERE", f"WHERE {batch_filter} AND") + elif "GROUP BY" in query.upper(): + query = unfiltered_query.replace("GROUP BY", f"WHERE {batch_filter} GROUP BY") + elif "ORDER BY" in query.upper(): + query = unfiltered_query.replace("ORDER BY", f"WHERE {batch_filter} ORDER BY") + else: + query = unfiltered_query + f" WHERE {batch_filter}" + + return query diff --git a/great_expectations/expectations/metrics/query_metrics/query_table.py b/great_expectations/expectations/metrics/query_metrics/query_table.py index 8bf1e7362002..6e19d4ea2014 100644 --- a/great_expectations/expectations/metrics/query_metrics/query_table.py +++ b/great_expectations/expectations/metrics/query_metrics/query_table.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, List, Union +from typing import TYPE_CHECKING, Any, Dict, List from great_expectations.compatibility.sqlalchemy import ( sqlalchemy as sa, @@ -12,6 +12,7 @@ ) from great_expectations.expectations.metrics.metric_provider import metric_value from great_expectations.expectations.metrics.query_metric_provider import ( + MissingElementError, QueryMetricProvider, ) from great_expectations.util import get_sqlalchemy_subquery_type @@ -36,25 +37,36 @@ def _sqlalchemy( ) -> List[dict]: query = cls._get_query_from_metric_value_kwargs(metric_value_kwargs) - selectable: Union[sa.sql.Selectable, str] - selectable, _, _ = execution_engine.get_compute_domain( + batch_selectable: sa.sql.Selectable + batch_selectable, _, _ = execution_engine.get_compute_domain( metric_domain_kwargs, domain_type=MetricDomainTypes.TABLE ) - if isinstance(selectable, sa.Table): - query = query.format(batch=selectable) + if isinstance(batch_selectable, sa.Table): + query = query.format(batch=batch_selectable) + elif isinstance(batch_selectable, get_sqlalchemy_subquery_type()): + if execution_engine.dialect_name in cls.dialect_columns_require_subquery_aliases: + try: + query = cls._get_query_string_with_substituted_batch_parameters( + query=query, + batch_subquery=batch_selectable, + ) + except MissingElementError: + # if we are unable to extract the subquery parameters, + # we fall back to the default behavior for all dialects + batch = batch_selectable.compile(compile_kwargs={"literal_binds": True}) + query = query.format(batch=f"({batch})") + else: + batch = batch_selectable.compile(compile_kwargs={"literal_binds": True}) + query = query.format(batch=f"({batch})") elif isinstance( - selectable, get_sqlalchemy_subquery_type() - ): # Specifying a runtime query in a RuntimeBatchRequest returns the active batch as a Subquery or Alias; sectioning the active batch off w/ parentheses ensures flow of operations doesn't break # noqa: E501 - query = query.format(batch=f"({selectable})") - elif isinstance( - selectable, sa.sql.Select - ): # Specifying a row_condition returns the active batch as a Select object, requiring compilation & aliasing when formatting the parameterized query # noqa: E501 - query = query.format( - batch=f'({selectable.compile(compile_kwargs={"literal_binds": True})}) AS subselect', # noqa: E501 - ) + batch_selectable, sa.sql.Select + ): # Specifying a row_condition returns the active batch as a Select object + # requiring compilation & aliasing when formatting the parameterized query + batch = batch_selectable.compile(compile_kwargs={"literal_binds": True}) + query = query.format(batch=f"({batch}) AS subselect") else: - query = query.format(batch=f"({selectable})") + query = query.format(batch=f"({batch_selectable})") result: List[sqlalchemy.Row] = execution_engine.execute_query(sa.text(query)).fetchall() # type: ignore[assignment,arg-type] return [element._asdict() for element in result] diff --git a/great_expectations/expectations/metrics/util.py b/great_expectations/expectations/metrics/util.py index 24ca15db4e15..92fc6349706f 100644 --- a/great_expectations/expectations/metrics/util.py +++ b/great_expectations/expectations/metrics/util.py @@ -3,15 +3,18 @@ import logging import re from collections import UserDict +from types import ModuleType from typing import ( TYPE_CHECKING, Any, Dict, + Iterable, List, Mapping, Optional, Sequence, Tuple, + Type, overload, ) @@ -63,6 +66,11 @@ except ImportError: clickhouse_sqlalchemy = None +try: + import databricks.sqlalchemy as sqla_databricks +except (ImportError, AttributeError): + sqla_databricks = None # type: ignore[assignment] + _BIGQUERY_MODULE_NAME = "sqlalchemy_bigquery" from great_expectations.compatibility import bigquery as sqla_bigquery @@ -79,12 +87,33 @@ teradatatypes = None +def _is_databricks_dialect(dialect: ModuleType | sa.Dialect | Type[sa.Dialect]) -> bool: + """ + Check if the Databricks dialect is being provided. + """ + if not sqla_databricks: + return False + try: + if isinstance(dialect, sqla_databricks.DatabricksDialect): + return True + if hasattr(dialect, "DatabricksDialect"): + return True + if issubclass(dialect, sqla_databricks.DatabricksDialect): # type: ignore[arg-type] + return True + except Exception: + pass + return False + + def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 - column, regex, dialect, positive=True -): + column: sa.Column, + regex: str, + dialect: ModuleType | Type[sa.Dialect] | sa.Dialect, + positive: bool = True, +) -> sa.SQLColumnExpression | None: try: # postgres - if issubclass(dialect.dialect, sa.dialects.postgresql.dialect): + if issubclass(dialect.dialect, sa.dialects.postgresql.dialect): # type: ignore[union-attr] if positive: return sqlalchemy.BinaryExpression( column, sqlalchemy.literal(regex), sqlalchemy.custom_op("~") @@ -96,11 +125,18 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 except AttributeError: pass + # databricks sql + if _is_databricks_dialect(dialect): + if positive: + return sa.func.regexp_like(column, sqlalchemy.literal(regex)) + else: + return sa.not_(sa.func.regexp_like(column, sqlalchemy.literal(regex))) + # redshift # noinspection PyUnresolvedReferences try: if hasattr(dialect, "RedshiftDialect") or ( - aws.redshiftdialect and issubclass(dialect.dialect, aws.redshiftdialect.RedshiftDialect) + aws.redshiftdialect and issubclass(dialect.dialect, aws.redshiftdialect.RedshiftDialect) # type: ignore[union-attr] ): if positive: return sqlalchemy.BinaryExpression( @@ -117,7 +153,7 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 try: # MySQL - if issubclass(dialect.dialect, sa.dialects.mysql.dialect): + if issubclass(dialect.dialect, sa.dialects.mysql.dialect): # type: ignore[union-attr] if positive: return sqlalchemy.BinaryExpression( column, sqlalchemy.literal(regex), sqlalchemy.custom_op("REGEXP") @@ -134,7 +170,7 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 try: # Snowflake if issubclass( - dialect.dialect, + dialect.dialect, # type: ignore[union-attr] snowflake.sqlalchemy.snowdialect.SnowflakeDialect, ): if positive: @@ -216,7 +252,7 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 try: # Teradata - if issubclass(dialect.dialect, teradatasqlalchemy.dialect.TeradataDialect): + if issubclass(dialect.dialect, teradatasqlalchemy.dialect.TeradataDialect): # type: ignore[union-attr] if positive: return ( sa.func.REGEXP_SIMILAR( @@ -237,7 +273,7 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 try: # sqlite # regex_match for sqlite introduced in sqlalchemy v1.4 - if issubclass(dialect.dialect, sa.dialects.sqlite.dialect) and version.parse( + if issubclass(dialect.dialect, sa.dialects.sqlite.dialect) and version.parse( # type: ignore[union-attr] sa.__version__ ) >= version.parse("1.4"): if positive: @@ -256,7 +292,9 @@ def get_dialect_regex_expression( # noqa: C901, PLR0911, PLR0912, PLR0915 return None -def _get_dialect_type_module(dialect=None): +def _get_dialect_type_module( + dialect: ModuleType | Type[sa.Dialect] | sa.Dialect | None = None, +) -> ModuleType | Type[sa.Dialect] | sa.Dialect: if dialect is None: logger.warning("No sqlalchemy dialect found; relying in top-level sqlalchemy types.") return sa @@ -274,7 +312,7 @@ def _get_dialect_type_module(dialect=None): if ( isinstance( dialect, - sqla_bigquery.BigQueryDialect, + sqla_bigquery.BigQueryDialect, # type: ignore[attr-defined] ) and bigquery_types_tuple is not None ): @@ -286,7 +324,7 @@ def _get_dialect_type_module(dialect=None): try: if ( issubclass( - dialect, + dialect, # type: ignore[arg-type] teradatasqlalchemy.dialect.TeradataDialect, ) and teradatatypes is not None @@ -836,14 +874,14 @@ def _get_normalized_column_name_mapping_if_exists( return None if verify_only else normalized_batch_columns_mappings -def parse_value_set(value_set): +def parse_value_set(value_set: Iterable) -> list: parsed_value_set = [parse(value) if isinstance(value, str) else value for value in value_set] return parsed_value_set -def get_dialect_like_pattern_expression( # noqa: C901, PLR0912 - column, dialect, like_pattern, positive=True -): +def get_dialect_like_pattern_expression( # noqa: C901, PLR0912, PLR0915 + column: sa.Column, dialect: ModuleType, like_pattern: str, positive: bool = True +) -> sa.BinaryExpression | None: dialect_supported: bool = False try: @@ -868,6 +906,9 @@ def get_dialect_like_pattern_expression( # noqa: C901, PLR0912 ): dialect_supported = True + if _is_databricks_dialect(dialect): + dialect_supported = True + try: if hasattr(dialect, "RedshiftDialect"): dialect_supported = True diff --git a/great_expectations/expectations/regex_based_column_map_expectation.py b/great_expectations/expectations/regex_based_column_map_expectation.py index 6a4c4eb79f01..4b3d37eacc50 100644 --- a/great_expectations/expectations/regex_based_column_map_expectation.py +++ b/great_expectations/expectations/regex_based_column_map_expectation.py @@ -87,8 +87,9 @@ def _sqlalchemy(cls, column, _dialect, **kwargs): regex_expression = get_dialect_regex_expression(column, cls.regex, _dialect) if regex_expression is None: - logger.warning(f"Regex is not supported for dialect {_dialect.dialect.name!s}") - raise NotImplementedError + msg = f"Regex is not supported for dialect {_dialect.dialect.name!s}" + logger.warning(msg) + raise NotImplementedError(msg) return regex_expression diff --git a/great_expectations/experimental/datasource/__init__.py b/great_expectations/experimental/datasource/__init__.py deleted file mode 100644 index f39881545cbc..000000000000 --- a/great_expectations/experimental/datasource/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from great_expectations.experimental.datasource.fabric import ( - FabricPowerBIDatasource, - PowerBIDax, - PowerBIMeasure, - PowerBITable, -) - -__all__ = [ - "FabricPowerBIDatasource", - "PowerBIDax", - "PowerBIMeasure", - "PowerBITable", -] diff --git a/great_expectations/validator/validator.py b/great_expectations/validator/validator.py index 036ea7056f7d..f2cd1d9f9f72 100644 --- a/great_expectations/validator/validator.py +++ b/great_expectations/validator/validator.py @@ -414,7 +414,9 @@ def __getattr__(self, name): if self.active_batch is None: raise TypeError("active_batch cannot be None") # noqa: TRY003 name = name.lower() - if name.startswith("expect_") and get_expectation_impl(name): + if ( + name.startswith("expect_") or name == "unexpected_rows_expectation" + ) and get_expectation_impl(name): return self.validate_expectation(name) elif ( self._expose_dataframe_methods diff --git a/pyproject.toml b/pyproject.toml index 0aefdbd60e8b..ddaee570575b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -208,6 +208,7 @@ module = [ "boto3.*", "botocore.*", "clickhouse_sqlalchemy.*", + "databricks.*", "google.*", "great_expectations.compatibility.pydantic.*", "ipywidgets.*", @@ -677,6 +678,7 @@ markers = [ "sqlite: mark test requiring sqlite", "slow: mark tests taking longer than 1 second.", "spark: mark a test as Spark-dependent.", + "spark_connect: mark a test as Spark Connect-dependent.", "trino: mark a test as trino-dependent.", "unit: mark a test as a unit test.", "v2_api: mark test as specific to the v2 api (e.g. pre Data Connectors).", diff --git a/reqs/requirements-dev-databricks.txt b/reqs/requirements-dev-databricks.txt index 76fc36682bf4..c51024fd7e4e 100644 --- a/reqs/requirements-dev-databricks.txt +++ b/reqs/requirements-dev-databricks.txt @@ -1,2 +1 @@ -databricks-sql-connector>=2.0.0; python_version < "3.12" -databricks-sql-connector[sqlalchemy]>=3.0.0; python_version >= "3.12" +databricks-sql-connector[sqlalchemy]>=3.0.0 diff --git a/reqs/requirements-dev-spark-connect.txt b/reqs/requirements-dev-spark-connect.txt new file mode 100644 index 000000000000..cdd2782d387b --- /dev/null +++ b/reqs/requirements-dev-spark-connect.txt @@ -0,0 +1,8 @@ +# NOTES: +# Spark connect's requirements are here: https://github.com/apache/spark/blob/ed3a9b1aa92957015592b399167a960b68b73beb/dev/requirements.txt#L60 +# grpcio and grpcio-status should be bumped up to match that, but that conflicts with our constraints.txt file. +# TODO: Fix in V1-532 + +googleapis-common-protos>=1.56.4 +grpcio>=1.48.1 +grpcio-status>=1.48.1 diff --git a/tasks.py b/tasks.py index e64fc83f0bfe..c6462960d657 100644 --- a/tasks.py +++ b/tasks.py @@ -933,6 +933,14 @@ class TestDependencies(NamedTuple): services=("spark",), extra_pytest_args=("--spark",), ), + "spark_connect": TestDependencies( + requirement_files=( + "reqs/requirements-dev-spark.txt", + "reqs/requirements-dev-spark-connect.txt", + ), + services=("spark",), + extra_pytest_args=("--spark_connect",), + ), "trino": TestDependencies( ("reqs/requirements-dev-trino.txt",), services=("trino",), diff --git a/tests/conftest.py b/tests/conftest.py index 13806119a73a..6ea046a6bf5c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,7 @@ import great_expectations as gx from great_expectations.analytics.config import ENV_CONFIG +from great_expectations.compatibility import pyspark from great_expectations.compatibility.sqlalchemy_compatibility_wrappers import ( add_dataframe_to_db, ) @@ -95,7 +96,6 @@ from pytest_mock import MockerFixture - from great_expectations.compatibility import pyspark from great_expectations.compatibility.sqlalchemy import Engine yaml = YAMLHandler() @@ -130,6 +130,7 @@ "pyarrow", "snowflake", "spark", + "spark_connect", "sqlite", "trino", "unit", @@ -196,6 +197,11 @@ def pytest_addoption(parser): action="store_true", help="If set, execute tests against the spark test suite", ) + parser.addoption( + "--spark_connect", + action="store_true", + help="If set, execute tests against the spark-connect test suite", + ) parser.addoption( "--no-sqlalchemy", action="store_true", @@ -492,6 +498,20 @@ def spark_session(test_backends) -> pyspark.SparkSession: raise ValueError("spark tests are requested, but pyspark is not installed") +@pytest.fixture +def spark_connect_session(test_backends): + from great_expectations.compatibility import pyspark + + if pyspark.SparkConnectSession: # type: ignore[truthy-function] + spark_connect_session = pyspark.SparkSession.builder.remote( + "sc://localhost:15002" + ).getOrCreate() + assert isinstance(spark_connect_session, pyspark.SparkConnectSession) + return spark_connect_session + + raise ValueError("spark tests are requested, but pyspark is not installed") + + @pytest.fixture def basic_spark_df_execution_engine(spark_session): from great_expectations.execution_engine import SparkDFExecutionEngine diff --git a/tests/datasource/fluent/integration/test_sql_datasources.py b/tests/datasource/fluent/integration/test_sql_datasources.py index a8bb6dd1f508..9cb4b1f9154a 100644 --- a/tests/datasource/fluent/integration/test_sql_datasources.py +++ b/tests/datasource/fluent/integration/test_sql_datasources.py @@ -10,6 +10,7 @@ from pprint import pformat as pf from typing import ( TYPE_CHECKING, + Any, Final, Generator, Literal, @@ -719,7 +720,7 @@ def _fails_expectation(param_id: str) -> bool: This does not mean that it SHOULD fail, but that it currently does. """ column_name: ColNameParamId - dialect, column_name, _ = param_id.split("-") # type: ignore[assignment] + dialect, column_name, *_ = param_id.split("-") # type: ignore[assignment] dialects_need_fixes: list[DatabaseType] = FAILS_EXPECTATION.get(column_name, []) return dialect in dialects_need_fixes @@ -765,15 +766,25 @@ def _raw_query_check_column_exists( _EXPECTATION_TYPES: Final[tuple[ParameterSet, ...]] = ( - param("expect_column_to_exist"), - param("expect_column_values_to_not_be_null"), + param("expect_column_to_exist", {}, id="expect_column_to_exist"), + param("expect_column_values_to_not_be_null", {}, id="expect_column_values_to_not_be_null"), + param( + "expect_column_values_to_match_regex", + {"regex": r".*"}, + id="expect_column_values_to_match_regex", + ), + param( + "expect_column_values_to_match_like_pattern", + {"like_pattern": r"%"}, + id="expect_column_values_to_match_like_pattern", + ), ) @pytest.mark.filterwarnings( "once::DeprecationWarning" ) # snowflake `add_table_asset` raises warning on passing a schema -@pytest.mark.parametrize("expectation_type", _EXPECTATION_TYPES) +@pytest.mark.parametrize("expectation_type, extra_exp_kwargs", _EXPECTATION_TYPES) class TestColumnExpectations: @pytest.mark.parametrize( "column_name", @@ -806,6 +817,7 @@ def test_unquoted_params( table_factory: TableFactory, column_name: str | quoted_name, expectation_type: str, + extra_exp_kwargs: dict[str, Any], request: pytest.FixtureRequest, ): """ @@ -862,7 +874,7 @@ def test_unquoted_params( suite = context.suites.add(ExpectationSuite(name=f"{datasource.name}-{asset.name}")) suite.add_expectation_configuration( expectation_configuration=ExpectationConfiguration( - type=expectation_type, kwargs={"column": column_name} + type=expectation_type, kwargs={"column": column_name, **extra_exp_kwargs} ) ) suite.save() @@ -908,6 +920,7 @@ def test_quoted_params( table_factory: TableFactory, column_name: str | quoted_name, expectation_type: str, + extra_exp_kwargs: dict[str, Any], request: pytest.FixtureRequest, ): """ @@ -966,7 +979,7 @@ def test_quoted_params( suite = context.suites.add(ExpectationSuite(name=f"{datasource.name}-{asset.name}")) suite.add_expectation_configuration( expectation_configuration=ExpectationConfiguration( - type=expectation_type, kwargs={"column": column_name} + type=expectation_type, kwargs={"column": column_name, **extra_exp_kwargs} ) ) suite.save() @@ -1028,6 +1041,7 @@ def test_desired_state( table_factory: TableFactory, column_name: str | quoted_name, expectation_type: str, + extra_exp_kwargs: dict[str, Any], request: pytest.FixtureRequest, ): """ @@ -1096,7 +1110,7 @@ def test_desired_state( suite = context.suites.add(ExpectationSuite(name=f"{datasource.name}-{asset.name}")) suite.add_expectation_configuration( expectation_configuration=ExpectationConfiguration( - type=expectation_type, kwargs={"column": column_name} + type=expectation_type, kwargs={"column": column_name, **extra_exp_kwargs} ) ) suite.save() diff --git a/tests/datasource/fluent/test_fabric.py b/tests/datasource/fluent/test_fabric.py index ce4a52eddeb1..57ea750fc15b 100644 --- a/tests/datasource/fluent/test_fabric.py +++ b/tests/datasource/fluent/test_fabric.py @@ -11,11 +11,11 @@ import great_expectations.core.batch_spec from great_expectations.datasource.fluent import BatchRequest -from great_expectations.exceptions.exceptions import BuildBatchRequestError -from great_expectations.experimental.datasource.fabric import ( +from great_expectations.datasource.fluent.fabric import ( FabricPowerBIDatasource, _PowerBIAsset, ) +from great_expectations.exceptions.exceptions import BuildBatchRequestError if TYPE_CHECKING: from great_expectations.data_context import AbstractDataContext diff --git a/tests/expectations/metrics/test_metric_providers.py b/tests/expectations/metrics/test_metric_providers.py index 903f56bcfd9a..fe2c6cf816fa 100644 --- a/tests/expectations/metrics/test_metric_providers.py +++ b/tests/expectations/metrics/test_metric_providers.py @@ -5,6 +5,9 @@ import pytest +from great_expectations.compatibility.sqlalchemy import ( + sqlalchemy as sa, +) from great_expectations.execution_engine import ( PandasExecutionEngine, SparkDFExecutionEngine, @@ -330,3 +333,41 @@ def _spark( assert len(mock_registry._registered_metrics.keys()) == prev_registered_metric_key_count + 1 assert "query.custom_metric" in mock_registry._registered_metrics + + +@pytest.mark.unit +@pytest.mark.parametrize( + "input_query,expected_query", + [ + ( + "SELECT * FROM {batch}", + "SELECT * FROM iris WHERE datetime_column = '01/12/2024'", + ), + ( + "SELECT * FROM {batch} WHERE passenger_count > 7", + "SELECT * FROM iris WHERE datetime_column = '01/12/2024' AND passenger_count > 7", + ), + ( + "SELECT * FROM {batch} WHERE passenger_count > 7 ORDER BY iris.'PetalLengthCm' DESC", + "SELECT * FROM iris WHERE datetime_column = '01/12/2024' " + "AND passenger_count > 7 ORDER BY iris.'PetalLengthCm' DESC", + ), + ( + "SELECT * FROM {batch} WHERE passenger_count > 7 GROUP BY iris.'Species' DESC", + "SELECT * FROM iris WHERE datetime_column = '01/12/2024' " + "AND passenger_count > 7 GROUP BY iris.'Species' DESC", + ), + ], +) +def test__get_query_string_with_substituted_batch_parameters(input_query: str, expected_query: str): + batch_subquery = ( + sa.select("*") + .select_from(sa.text("iris")) + .where(sa.text("datetime_column = '01/12/2024'")) + .subquery() + ) + actual_query = QueryMetricProvider._get_query_string_with_substituted_batch_parameters( + query=input_query, + batch_subquery=batch_subquery, + ) + assert actual_query == expected_query diff --git a/tests/integration/spark/test_spark_connect.py b/tests/integration/spark/test_spark_connect.py new file mode 100644 index 000000000000..2d1e9368a7f8 --- /dev/null +++ b/tests/integration/spark/test_spark_connect.py @@ -0,0 +1,86 @@ +import logging +from typing import Any + +import pytest + +import great_expectations as gx +from great_expectations.compatibility.pyspark import ConnectDataFrame, Row, SparkConnectSession +from great_expectations.core.validation_definition import ValidationDefinition +from great_expectations.data_context.data_context.abstract_data_context import AbstractDataContext +from great_expectations.exceptions.exceptions import BuildBatchRequestError + +logger = logging.getLogger(__name__) + + +pytestmark = pytest.mark.spark_connect + +DATAFRAME_VALUES = [1, 2, 3] + + +@pytest.fixture +def spark_validation_definition( + ephemeral_context_with_defaults: AbstractDataContext, +) -> ValidationDefinition: + context = ephemeral_context_with_defaults + bd = ( + context.data_sources.add_spark(name="spark-connect-ds") + .add_dataframe_asset(name="spark-connect-asset") + .add_batch_definition_whole_dataframe(name="spark-connect-bd") + ) + suite = context.suites.add( + gx.ExpectationSuite( + name="spark-connect-suite", + expectations=[ + gx.expectations.ExpectColumnValuesToBeInSet( + column="column", value_set=DATAFRAME_VALUES + ), + ], + ) + ) + return context.validation_definitions.add( + gx.ValidationDefinition(name="spark-connect-vd", suite=suite, data=bd) + ) + + +def test_spark_connect( + spark_connect_session: SparkConnectSession, + spark_validation_definition: ValidationDefinition, +): + df = spark_connect_session.createDataFrame( + [Row(column=x) for x in DATAFRAME_VALUES], + ) + assert isinstance(df, ConnectDataFrame) + + results = spark_validation_definition.run(batch_parameters={"dataframe": df}) + + assert results.success + + +@pytest.mark.parametrize("not_a_dataframe", [None, 1, "string", 1.0, True]) +def test_error_messages_if_we_get_an_invalid_dataframe( + not_a_dataframe: Any, + spark_validation_definition: ValidationDefinition, +): + with pytest.raises( + BuildBatchRequestError, match="Cannot build batch request without a Spark DataFrame." + ): + spark_validation_definition.run(batch_parameters={"dataframe": not_a_dataframe}) + + +def test_spark_connect_with_spark_connect_session_factory_method( + spark_validation_definition: ValidationDefinition, +): + """This test demonstrates that SparkConnectionSession can be used to create a session. + + This test is being added because in some scenarios, this appeared to fail, but it was + the result of other active spark sessions. + """ + spark_connect_session = SparkConnectSession.builder.remote("sc://localhost:15002").getOrCreate() + assert isinstance(spark_connect_session, SparkConnectSession) + df = spark_connect_session.createDataFrame( + [Row(column=x) for x in DATAFRAME_VALUES], + ) + + results = spark_validation_definition.run(batch_parameters={"dataframe": df}) + + assert results.success diff --git a/tests/test_definitions/query_expectations/unexpected_rows_expectation.json b/tests/test_definitions/query_expectations/unexpected_rows_expectation.json new file mode 100644 index 000000000000..3ca3d76148a1 --- /dev/null +++ b/tests/test_definitions/query_expectations/unexpected_rows_expectation.json @@ -0,0 +1,45 @@ +{ + "expectation_type" : "unexpected_rows_expectation", + "datasets" : [{ + "only_for": ["postgresql", "snowflake", "sqlite"], + "dataset_name": "unexpected_rows_expectation_1", + "data" : { + "c1" : [4,5,6,7], + "c2" : ["a","b","c","d"], + "c3" : [null,null,null,null], + "c4" : [4.0, 3.0, 3.5, 1.2] + }, + "schemas": { + "spark": { + "c1": "IntegerType", + "c2": "StringType", + "c3": "StringType", + "c4": "FloatType" + } + }, + "tests": [{ + "title": "basic_positive_test", + "include_in_gallery": false, + "exact_match_out" : false, + "in":{ + "unexpected_rows_query": "SELECT * FROM {batch} WHERE c1 > 7" + }, + "out":{ + "success":true, + "observed_value": "0 unexpected rows" + } + }, + { + "title": "basic_negative_test", + "include_in_gallery": false, + "exact_match_out" : false, + "in":{ + "unexpected_rows_query": "SELECT * FROM {batch} WHERE c1 > 6" + }, + "out":{ + "success":false, + "observed_value": "1 unexpected row" + } + }] + }] +}