Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Fix Snowflake column identifier issue when running checkpoint #8630

Closed
wants to merge 98 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
b3b4795
update tests from branch
Kilo59 Aug 29, 2023
ed2a7df
test sqlite
Kilo59 Aug 29, 2023
5d72e3f
use `tmp_path` so that db file is cleaned up
Kilo59 Aug 29, 2023
04bf890
databricks xfail
Kilo59 Aug 29, 2023
6639f2d
postgres TDD test
Kilo59 Aug 29, 2023
5f090e5
insert data
Kilo59 Aug 29, 2023
6438f7d
parametrize against different column conventions
Kilo59 Aug 29, 2023
f9e058a
snowflake tests
Kilo59 Aug 30, 2023
f57180d
silence warning
Kilo59 Aug 30, 2023
b8a65f8
don't skip snowflake tests
Kilo59 Aug 30, 2023
31ebce1
temp skip other markers
Kilo59 Aug 30, 2023
e3ff122
missing fixture decorator
Kilo59 Aug 30, 2023
7a989f3
more test cases
Kilo59 Aug 30, 2023
9ebbbc5
more cases
Kilo59 Aug 30, 2023
5cfeab3
`add_sql_datasource` parametrized fixture
Kilo59 Aug 31, 2023
2192466
more params
Kilo59 Aug 31, 2023
134dd8f
rename tests
Kilo59 Aug 31, 2023
47128d0
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Aug 31, 2023
a954842
`Record` -> `Row`
Kilo59 Aug 31, 2023
dffa883
fix bad merge
Kilo59 Aug 31, 2023
5eea355
fail-fast: false
Kilo59 Aug 31, 2023
10f1f58
don't skip databricks
Kilo59 Aug 31, 2023
60d1b78
wrong job
Kilo59 Aug 31, 2023
416d323
show column names on failure
Kilo59 Aug 31, 2023
7ec2c26
upper
Kilo59 Aug 31, 2023
16d15b5
even more params
Kilo59 Aug 31, 2023
bcf9a29
readjust parameters
Kilo59 Aug 31, 2023
1dd6dc6
test multiple expectations
Kilo59 Aug 31, 2023
bd25d11
wait for unit-tests and static-analysis
Kilo59 Aug 31, 2023
c9bae35
add xfails
Kilo59 Aug 31, 2023
af113c7
`TextClause`
Kilo59 Aug 31, 2023
3ced6ea
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Aug 31, 2023
67ebe28
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Aug 31, 2023
90035b2
skip trino
Kilo59 Aug 31, 2023
9cde7e4
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Aug 31, 2023
86a34e2
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 1, 2023
f9e9df6
Merge remote-tracking branch 'origin/b/lakitu-312/sf-col-idtn' into b…
Kilo59 Sep 1, 2023
36ff7aa
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 1, 2023
a3d2759
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 1, 2023
1097718
Merge remote-tracking branch 'origin/b/lakitu-312/sf-col-idtn' into b…
Kilo59 Sep 1, 2023
ad0d7af
fix bad merge
Kilo59 Sep 1, 2023
4101368
remove databricks xfail
Kilo59 Sep 1, 2023
a8deaee
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 2, 2023
277ce9b
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 5, 2023
1bbe466
better debug output
Kilo59 Sep 5, 2023
f9b264f
Merge remote-tracking branch 'origin/b/lakitu-312/sf-col-idtn' into b…
Kilo59 Sep 5, 2023
271fdb3
wrap_identifier stub
Kilo59 Sep 5, 2023
52bcde8
test for expectation exception
Kilo59 Sep 5, 2023
f014067
poc hack
Kilo59 Sep 5, 2023
547fa1e
don't just check for exceptions
Kilo59 Sep 5, 2023
22b0f94
add sqlalchemy plugin
Kilo59 Sep 5, 2023
7035a56
disable poc
Kilo59 Sep 5, 2023
a5354cf
disable snowflake normalization
Kilo59 Sep 5, 2023
07bba1e
re-enable tests
Kilo59 Sep 5, 2023
e896c33
logs
Kilo59 Sep 6, 2023
86e2c2c
use quotes when creating tables
Kilo59 Sep 6, 2023
b79d791
use dialect specific quotes for creating tables
Kilo59 Sep 6, 2023
06eced5
use dialect specific quotes for creating tables
Kilo59 Sep 6, 2023
5c56190
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 11, 2023
6f61043
Merge remote-tracking branch 'origin/b/lakitu-312/sf-col-idtn' into b…
Kilo59 Sep 11, 2023
44426b8
fix merge
Kilo59 Sep 11, 2023
ad9c725
comment out high priority xfail
Kilo59 Sep 11, 2023
212b49d
remove single quote params
Kilo59 Sep 11, 2023
b776f89
add `name` `NAME` params
Kilo59 Sep 11, 2023
c4fb6f7
revert expectations changes
Kilo59 Sep 11, 2023
b1597b8
re-apply defaults
Kilo59 Sep 11, 2023
395e84a
fix debugging
Kilo59 Sep 11, 2023
e37dfea
test settings temp changes
Kilo59 Sep 11, 2023
b73b568
temp TEST changes
Kilo59 Sep 11, 2023
e0d2d54
log `get_sqlalchemy_column_metadata` errors
Kilo59 Sep 11, 2023
761df18
lots of logging
Kilo59 Sep 11, 2023
e480abd
wrap in `sa.text()`
Kilo59 Sep 11, 2023
70d3be0
better test failure output
Kilo59 Sep 12, 2023
f9c63ec
use the `execution_engine().get_connection()` method
Kilo59 Sep 12, 2023
222809a
update table_factory usage
Kilo59 Sep 12, 2023
c3d50f8
run all the tests
Kilo59 Sep 12, 2023
adf9dec
xfail table identifier tests for now
Kilo59 Sep 12, 2023
1bacbad
update `REQUIRE_FIXES`
Kilo59 Sep 12, 2023
665b4c2
add TODO
Kilo59 Sep 12, 2023
7672f6b
remove new params
Kilo59 Sep 12, 2023
fd3f159
conditional `sa.text()`
Kilo59 Sep 12, 2023
ef31a90
cleanup debug logging
Kilo59 Sep 12, 2023
87c7c4b
add `_get_exception_details()` helper
Kilo59 Sep 13, 2023
4963dd0
remove xfail
Kilo59 Sep 13, 2023
7ef9a59
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 13, 2023
4caa61f
revert local changes
Kilo59 Sep 13, 2023
6953fa1
type ignores
Kilo59 Sep 13, 2023
692c3fd
revert
Kilo59 Sep 13, 2023
84e04f1
simplify `REQUIRES_FIXES`
Kilo59 Sep 13, 2023
f7b2b76
add `TestColumnIdentifiers::test_raw_queries()`
Kilo59 Sep 13, 2023
5f75854
skip unneeded sqlite drop tables
Kilo59 Sep 13, 2023
bf1357c
test columns created without quotes
Kilo59 Sep 13, 2023
40baae6
add `quoted_identifiers_ignore_case` to `SnowflakeDatasource`
Kilo59 Sep 14, 2023
8881d0f
better raw_query
Kilo59 Sep 14, 2023
72b43e8
use query builder too
Kilo59 Sep 14, 2023
c19e6bb
testing changes
Kilo59 Sep 14, 2023
1a60630
`_CaseInsensitiveString`
Kilo59 Sep 14, 2023
aef3220
Merge branch 'develop' into b/lakitu-312/sf-col-idtn
Kilo59 Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,20 @@ jobs:
SNOWFLAKE_ROLE: ${{secrets.SNOWFLAKE_ROLE}}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
markers:
- athena or clickhouse or openpyxl or pyarrow or project or sqlite or aws_creds
- aws_deps
- big
- cli
# - aws_deps
# - big
# - cli
- databricks
- filesystem
- mssql
- mysql
# - filesystem
# - mssql
# - mysql
- postgresql
- snowflake
- spark
# - spark
- trino
python-version: ["3.8", "3.9", "3.10", "3.11"]
exclude:
Expand Down
20 changes: 20 additions & 0 deletions great_expectations/datasource/fluent/snowflake_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ class SnowflakeDatasource(SQLDatasource):
warehouse: Optional[str] = None
role: Optional[str] = None
numpy: bool = False
quoted_identifiers_ignore_case: Optional[bool] = pydantic.Field(
default=None,
description="Specifies whether letters in double-quoted object identifiers are"
" stored and resolved as uppercase letters.",
)

_EXTRA_EXCLUDED_EXEC_ENG_ARGS: ClassVar[set] = {
"role",
Expand All @@ -98,6 +103,7 @@ class SnowflakeDatasource(SQLDatasource):
"password",
"numpy",
"warehouse",
"quoted_identifiers_ignore_case",
}

@pydantic.root_validator
Expand Down Expand Up @@ -132,9 +138,22 @@ def get_engine(self) -> sqlalchemy.Engine:
kwargs = model_dict.pop("kwargs", {})
connection_string = model_dict.pop("connection_string")

if self.quoted_identifiers_ignore_case is not None:
# ensure we don't drop any existing connect_args or session_parameters
connect_args = kwargs.get("connect_args", {})
session_parameters: dict[str, str | bool] = connect_args.get(
"session_parameters", {}
)
session_parameters[
"quoted_identifiers_ignore_case"
] = self.quoted_identifiers_ignore_case
connect_args["session_parameters"] = session_parameters
kwargs["connect_args"] = connect_args

if connection_string:
self._engine = sa.create_engine(connection_string, **kwargs)
else:
# TODO: this path should accept other kwargs (e.g. connect_args)
self._engine = self._build_engine_with_connect_args(**kwargs)

except Exception as e:
Expand All @@ -151,6 +170,7 @@ def get_engine(self) -> sqlalchemy.Engine:
return self._engine

def _build_engine_with_connect_args(self, **kwargs) -> sqlalchemy.Engine:
# TODO: rename this method `connect_args` has a special meaning in sqlalchemy
connect_args = self._get_connect_args()
connect_args.update(kwargs)
url = URL(**connect_args)
Expand Down
6 changes: 6 additions & 0 deletions great_expectations/datasource/fluent/sources.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ class _SourceFactories:
warehouse: None = ...,
role: None = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
@overload
def add_snowflake( # noqa: PLR0913
Expand All @@ -606,6 +607,7 @@ class _SourceFactories:
warehouse: Optional[str] = ...,
role: Optional[str] = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
@overload
def update_snowflake( # noqa: PLR0913
Expand All @@ -624,6 +626,7 @@ class _SourceFactories:
warehouse: None = ...,
role: None = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
@overload
def update_snowflake( # noqa: PLR0913
Expand All @@ -642,6 +645,7 @@ class _SourceFactories:
warehouse: Optional[str] = ...,
role: Optional[str] = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
@overload
def add_or_update_snowflake( # noqa: PLR0913
Expand All @@ -660,6 +664,7 @@ class _SourceFactories:
warehouse: None = ...,
role: None = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
@overload
def add_or_update_snowflake( # noqa: PLR0913
Expand All @@ -678,6 +683,7 @@ class _SourceFactories:
warehouse: Optional[str] = ...,
role: Optional[str] = ...,
numpy: bool = ...,
quoted_identifiers_ignore_case: bool | None = ...,
) -> SnowflakeDatasource: ...
def delete_snowflake(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ def get_inspector(self) -> sqlalchemy.engine.reflection.Inspector:
return self._inspector

@contextmanager
def get_connection(self) -> sqlalchemy.Connection:
def get_connection(self) -> sqlalchemy.Connection: # noqa: C901 # TODO: simplify
"""Get a connection for executing queries.

Some databases sqlite/mssql temp tables only persist within a connection,
Expand All @@ -1420,6 +1420,64 @@ def get_connection(self) -> sqlalchemy.Connection:
Returns:
Sqlalchemy connection
"""
from sqlalchemy.sql import quoted_name # noqa: TID251

def ibis_normalize_name(name):
logger.warning(f"ibis_normalize_name - {name}") # TODO: remove me
if name is None:
return None
elif not name:
return ""
elif name.lower() == name:
return quoted_name(name, quote=True)
else:
return name

def default_normalize_name(name):
logger.warning(f"default_normalize_name - {name}") # TODO: remove me
if name is None:
return None

name_lower = name.lower()
name_upper = name.upper()

if name_upper == name_lower:
# name has no upper/lower conversion, e.g. non-european characters.
# return unchanged
return name
elif name_upper == name and not (self.identifier_preparer._requires_quotes)(
name_lower
):
# name is all uppercase and doesn't require quoting; normalize
# to all lower case
return name_lower
elif name_lower == name:
# name is all lower case, which if denormalized means we need to
# force quoting on it
return quoted_name(name, quote=True)
else:
# name is mixed case, means it will be quoted in SQL when used
# later, no normalizes
return name

def default_denormalize_name(name):
logger.warning(f"default_denormalize_name - {name}") # TODO: remove me
if name is None:
return None

name_lower = name.lower()
name_upper = name.upper()

if name_upper == name_lower:
# name has no upper/lower conversion, e.g. non-european characters.
# return unchanged
return name
elif name_lower == name and not (self.identifier_preparer._requires_quotes)(
name_lower
):
name = name_upper
return name

if self.dialect_name in _PERSISTED_CONNECTION_DIALECTS:
try:
if not self._connection:
Expand All @@ -1431,6 +1489,11 @@ def get_connection(self) -> sqlalchemy.Connection:
pass
else:
with self.engine.connect() as connection:
if connection.dialect.name == "snowflake":
logger.warning("snowflake connection") # TODO: remove me
# connection.dialect.normalize_name = default_normalize_name
connection.dialect.normalize_name = ibis_normalize_name
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@billdirks here is where the monkeypatching is done.

# connection.dialect.denormalize_name = default_denormalize_name
yield connection

@public_api
Expand Down
88 changes: 78 additions & 10 deletions great_expectations/expectations/metrics/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@

import logging
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, overload
from collections import UserDict
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Sequence,
Tuple,
overload,
)

import numpy as np
from dateutil.parser import parse
Expand All @@ -13,14 +23,17 @@
from great_expectations.compatibility.sqlalchemy import (
sqlalchemy as sa,
)
from great_expectations.compatibility.typing_extensions import override
from great_expectations.execution_engine import (
PandasExecutionEngine, # noqa: TCH001
SqlAlchemyExecutionEngine, # noqa: TCH001
)
from great_expectations.execution_engine.sqlalchemy_batch_data import (
SqlAlchemyBatchData,
)
from great_expectations.execution_engine.sqlalchemy_dialect import GXSqlDialect
from great_expectations.execution_engine.sqlalchemy_dialect import (
GXSqlDialect,
)
from great_expectations.execution_engine.util import check_sql_engine_dialect

try:
Expand Down Expand Up @@ -308,6 +321,42 @@ def attempt_allowing_relative_error(dialect):
return detected_redshift or detected_psycopg2


class _CaseInsensitiveString(str):
def __init__(self, string: str):
self._original = string
self._lower = string.lower()
self._quote_string = '"'

def __eq__(self, other):
if self.is_quoted():
return self._original == str(other)
if isinstance(other, _CaseInsensitiveString):
return self._lower == other._lower
elif isinstance(other, str):
return self._lower == other.lower()
else:
return False

def __hash__(self):
return hash(self._lower)

def is_quoted(self):
return self._original.startswith(self._quote_string)


class SmartColumnLookup(UserDict):
def __init__(self, data: Dict[str, Any]):
self.data = data

@override
def __getitem__(self, key: Any) -> Any:
item = self.data[key]
if key == "name":
logger.debug(f"SmartColumnLookup - {key}:{item}")
return _CaseInsensitiveString(item)
return item


def get_sqlalchemy_column_metadata(
execution_engine: SqlAlchemyExecutionEngine,
table_selectable: sqlalchemy.Select,
Expand Down Expand Up @@ -342,7 +391,13 @@ def get_sqlalchemy_column_metadata(
AttributeError,
sa.exc.NoSuchTableError,
sa.exc.ProgrammingError,
):
) as exc:
logger.debug(
f"{type(exc).__name__} while introspecting columns", exc_info=exc
)
logger.info(
f"While introspecting columns {exc!r}; attempting reflection fallback"
)
# we will get a KeyError for temporary tables, since
# reflection will not find the temporary schema
columns = column_reflection_fallback(
Expand All @@ -359,9 +414,17 @@ def get_sqlalchemy_column_metadata(
sqlalchemy_engine=engine,
)

dialect_name = execution_engine.dialect.name
if dialect_name == "snowflake":
return [
# TODO: SmartColumn should know the dialect and do lookups based on that
SmartColumnLookup(column) # type: ignore[misc]
for column in columns
]

return columns
except AttributeError as e:
logger.debug(f"Error while introspecting columns: {e!s}")
logger.debug(f"Error while introspecting columns: {e!r}", exc_info=e)
return None


Expand All @@ -371,7 +434,6 @@ def column_reflection_fallback( # noqa: PLR0915
sqlalchemy_engine: sqlalchemy.Engine,
) -> List[Dict[str, str]]:
"""If we can't reflect the table, use a query to at least get column names."""

if isinstance(sqlalchemy_engine.engine, sqlalchemy.Engine):
connection = sqlalchemy_engine.engine.connect()
else:
Expand Down Expand Up @@ -591,11 +653,17 @@ def column_reflection_fallback( # noqa: PLR0915
.limit(1)
)
else:
query = (
sa.select(sa.text("*"))
.select_from(sa.text(selectable))
.limit(1)
)
try:
if isinstance(selectable, sqlalchemy.quoted_name):
sub_query = sa.text(selectable)
else:
sub_query = selectable
query = sa.select(sa.text("*")).select_from(sub_query).limit(1)
except Exception as exc:
logger.debug(
"Error during column_reflection_fallback()", exc_info=exc
)
raise

result_object = connection.execute(query)
# noinspection PyProtectedMember
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ If you are working in a configuration file you may use the inline comment \
[tool.pytest.ini_options]
filterwarnings = [
# Turn all warnings not explicitly filtered below into errors
"error",
# "error", # TODO: re-enable this
# This warning is common during testing where we intentionally use a COMPLETE format even in cases that would
# be potentially overly resource intensive in standard operation
"ignore:Setting result format to COMPLETE for a SqlAlchemyDataset:UserWarning",
Expand Down Expand Up @@ -569,4 +569,5 @@ testpaths = "tests"
# https://pytest-mock.readthedocs.io/en/latest/configuration.html#use-standalone-mock-package
mock_use_standalone_module = false
# https://docs.pytest.org/en/7.1.x/how-to/logging.html#how-to-manage-logging
log_level = "info"
log_level = "info" # TODO: set this back to info
# log_level = "warning"
Loading