Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Limit Result Format and QueryMetricProvider total unexpected records #10432

Merged
merged 35 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1560cf0
Only fetch MAX_IN_MEMORY_RECORDS_ALLOWED
NathanFarmer Sep 20, 2024
a05b09d
Add UnexpetedIndexList type
NathanFarmer Sep 20, 2024
ff01c09
Fix type errors
NathanFarmer Sep 20, 2024
ad43152
Merge branch 'develop' into b/ph-1599/unexpected-rows-query-oom
NathanFarmer Sep 20, 2024
3fd6dbd
Fix type errors
NathanFarmer Sep 20, 2024
15110d7
Add metric return type
NathanFarmer Sep 20, 2024
2f2b458
Add metric return type
NathanFarmer Sep 20, 2024
6719404
Protext against any return type
NathanFarmer Sep 20, 2024
ec8330d
Try to break some tests
NathanFarmer Sep 30, 2024
30d4f36
Type checker
NathanFarmer Sep 30, 2024
1133829
Test change
NathanFarmer Sep 30, 2024
eb5cafd
Fix pandas bug with filtering too early
NathanFarmer Sep 30, 2024
7cbbce5
Experimental change
NathanFarmer Sep 30, 2024
279e857
Add limit to unexpected_rows via changes to map_condition_auxilliary_…
NathanFarmer Sep 30, 2024
0c60d4e
Add limit to column_pair_map_condition_auxilliary_methods
NathanFarmer Sep 30, 2024
1c2ddfa
Add limit to multicolumn_map_condition_auxilliary_methods
NathanFarmer Sep 30, 2024
0b76dda
Revert test change to 200 records
NathanFarmer Sep 30, 2024
dadedb2
Add return types
NathanFarmer Sep 30, 2024
13578fd
Fix return type
NathanFarmer Sep 30, 2024
8b5f81a
Fix return type
NathanFarmer Sep 30, 2024
d49d13f
Fix return type
NathanFarmer Sep 30, 2024
f29a183
Merge branch 'develop' into b/ph-1599/unexpected-rows-query-oom
NathanFarmer Sep 30, 2024
283780c
Add all of the return types
NathanFarmer Oct 1, 2024
16727c2
Refactor out query parameters cleanup logic
NathanFarmer Oct 1, 2024
0183b39
Incorrect return type
NathanFarmer Oct 1, 2024
d25f399
Add test for _get_parameters_dict_from_query_parameters
NathanFarmer Oct 1, 2024
28cfc66
Add test for _get_sqlalchemy_records_from_query_and_batch_selectable
NathanFarmer Oct 1, 2024
a285786
Merge branch 'develop' into b/ph-1599/unexpected-rows-query-oom
NathanFarmer Oct 1, 2024
4f6a6bd
Remove uneccessary sa.select
NathanFarmer Oct 1, 2024
eda5406
Merge branch 'b/ph-1599/unexpected-rows-query-oom' of github.com:grea…
NathanFarmer Oct 1, 2024
9e2cb4a
Fix type errror
NathanFarmer Oct 1, 2024
e9ea554
Make type checker happy
NathanFarmer Oct 1, 2024
c4f37b5
Add fetchmany test
NathanFarmer Oct 1, 2024
83be1bc
Update docs
NathanFarmer Oct 1, 2024
cc9c5c5
Merge branch 'develop' into b/ph-1599/unexpected-rows-query-oom
NathanFarmer Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
The following table lists the valid keys for a Result Format dictionary and what their purpose is. Not all keys are used by every verbosity level.

| Dictionary key | Purpose |
| --- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`"result_format"` | Sets the fields to return in Validation Results. Valid values are `"BASIC"`, `"BOOLEAN_ONLY"`, `"COMPLETE"`, and `"SUMMARY"`. The default value is `"SUMMARY"`. |
| `"unexpected_index_column_names"` | Defines the columns that can be used to identify unexpected results. For example, primary key (PK) column(s) or other columns with unique identifiers. Supports multiple column names as a list. |
| Dictionary key | Purpose |
| --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`"result_format"` | Sets the fields to return in Validation Results. Valid values are `"BASIC"`, `"BOOLEAN_ONLY"`, `"COMPLETE"`, and `"SUMMARY"`. The default value is `"SUMMARY"`. |
| `"unexpected_index_column_names"` | Defines the columns that can be used to identify unexpected results. For example, primary key (PK) column(s) or other columns with unique identifiers. Supports multiple column names as a list. |
|`"return_unexpected_index_query"` | When running validations, a query (or a set of indices) is returned that allows you to retrieve the full set of unexpected results as well as the values of the identifying columns specified in `"unexpected_index_column_names"`. Setting this value to `False` suppresses the output (default is `True`). |
| `"partial_unexpected_count"` | Sets the number of results to include in `"partial_unexpected_counts"`, `"partial_unexpected_list"`, and `"partial_unexpected_index_list"` if applicable. Set the value to zero to suppress the unexpected counts. |
| `"exclude_unexpected_values"` | When running validations, a set of unexpected results' indices and values is returned. Setting this value to `True` suppresses values from the output to only have indices (default is `False`). |
| `"include_unexpected_rows"` | When `True` this returns the entire row for each unexpected value in dictionary form. This setting only applies when `"result_format"` has been explicitly set to a value other than `"BOOLEAN_ONLY"`. |

:::note
`include_unexpected_rows` returns EVERY row for each unexpected value. In large tables, this could result in an unmanageable amount of data.
:::
Comment on lines -12 to -14
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All I did was remove this note. Not sure why the diffs are hard to read.

| `"partial_unexpected_count"` | Sets the number of results to include in `"partial_unexpected_counts"`, `"partial_unexpected_list"`, and `"partial_unexpected_index_list"` if applicable. Set the value to zero to suppress the unexpected counts. |
| `"exclude_unexpected_values"` | When running validations, a set of unexpected results' indices and values is returned. Setting this value to `True` suppresses values from the output to only have indices (default is `False`). |
| `"include_unexpected_rows"` | When `True` this returns up to 200 entire rows for each unexpected value in dictionary form. This setting only applies when `"result_format"` has been explicitly set to a value other than `"BOOLEAN_ONLY"`. |
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ The following table lists the fields that can be found in the `result` dictionar
| partial_unexpected_counts | A partial list of values and counts, showing the number of times each of the unexpected values occur. (Up to 20 unexpected value/count pairs by default.) |
| unexpected_index_list | A list of the indices of the unexpected values in the column, as defined by the columns in `unexpected_index_column_names`. |
| unexpected_index_query | A query that can be used to retrieve all unexpected values (SQL and Spark), or the full list of unexpected indices (Pandas). |
| unexpected_list | A list of all values that violate the Expectation. |
| unexpected_list | A list of up to 200 values that violate the Expectation. |
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
TYPE_CHECKING,
Any,
Dict,
Sequence,
Tuple,
Union,
)

import great_expectations.exceptions as gx_exceptions
from great_expectations.expectations.metrics.util import (
MAX_RESULT_RECORDS,
get_dbms_compatible_metric_domain_kwargs,
)

Expand All @@ -29,6 +31,8 @@
)

if TYPE_CHECKING:
import pandas as pd

from great_expectations.compatibility import pyspark, sqlalchemy


Expand All @@ -42,7 +46,7 @@ def _pandas_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[dict]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -86,84 +90,10 @@ def _pandas_column_map_condition_values(
result_format = metric_value_kwargs["result_format"]

if result_format["result_format"] == "COMPLETE":
return list(domain_values)

return list(domain_values[: result_format["partial_unexpected_count"]])
return list(domain_values[:MAX_RESULT_RECORDS])


# TODO: <Alex>11/15/2022: Please DO_NOT_DELETE this method (even though it is not currently utilized). Thanks.</Alex> # noqa: E501
def _pandas_column_map_series_and_domain_values(
cls,
execution_engine: PandasExecutionEngine,
metric_domain_kwargs: dict,
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics["unexpected_condition"]
(
map_series,
compute_domain_kwargs_2,
accessor_domain_kwargs_2,
) = metrics["metric_partial_fn"]
assert (
compute_domain_kwargs == compute_domain_kwargs_2
), "map_series and condition must have the same compute domain"
assert (
accessor_domain_kwargs == accessor_domain_kwargs_2
), "map_series and condition must have the same accessor kwargs"

if "column" not in accessor_domain_kwargs:
raise ValueError( # noqa: TRY003
"""No "column" found in provided metric_domain_kwargs, but it is required for a column map metric
(_pandas_column_map_series_and_domain_values).
""" # noqa: E501
)

accessor_domain_kwargs = get_dbms_compatible_metric_domain_kwargs(
metric_domain_kwargs=accessor_domain_kwargs,
batch_columns_list=metrics["table.columns"],
)

column_name: Union[str, sqlalchemy.quoted_name] = accessor_domain_kwargs["column"]

df = execution_engine.get_domain_records(domain_kwargs=compute_domain_kwargs)

###
# NOTE: 20201111 - JPC - in the map_series / map_condition_series world (pandas), we
# currently handle filter_column_isnull differently than other map_fn / map_condition
# cases.
###
filter_column_isnull = kwargs.get(
"filter_column_isnull", getattr(cls, "filter_column_isnull", False)
)
if filter_column_isnull:
df = df[df[column_name].notnull()]

domain_values = df[column_name]

domain_values = domain_values[
boolean_mapped_unexpected_values == True # noqa: E712
]
map_series = map_series[boolean_mapped_unexpected_values == True] # noqa: E712

result_format = metric_value_kwargs["result_format"]

if result_format["result_format"] == "COMPLETE":
return (
list(domain_values),
list(map_series),
)

return (
list(domain_values[: result_format["partial_unexpected_count"]]),
list(map_series[: result_format["partial_unexpected_count"]]),
)
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
return list(domain_values[:limit])


def _pandas_column_map_condition_value_counts(
Expand All @@ -173,7 +103,7 @@ def _pandas_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> pd.Series[int]:
"""Returns respective value counts for distinct column values"""
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -238,7 +168,7 @@ def _sqlalchemy_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Tuple],
**kwargs,
):
) -> list[dict]:
"""
Particularly for the purpose of finding unexpected values, returns all the metric values which do not meet an
expected Expectation condition for ColumnMapExpectation Expectations.
Expand Down Expand Up @@ -284,7 +214,10 @@ def _sqlalchemy_column_map_condition_values(
)
query = query.limit(10000) # BigQuery upper bound on query parameters

return [val.unexpected_values for val in execution_engine.execute_query(query).fetchall()]
return [
val.unexpected_values
for val in execution_engine.execute_query(query).fetchmany(MAX_RESULT_RECORDS)
]


def _sqlalchemy_column_map_condition_value_counts(
Expand All @@ -294,7 +227,7 @@ def _sqlalchemy_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> Union[Sequence[sa.Row[Any]], Any]:
"""
Returns value counts for all the metric values which do not meet an expected Expectation condition for instances
of ColumnMapExpectation.
Expand Down Expand Up @@ -335,7 +268,7 @@ def _spark_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[dict]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
unexpected_condition, compute_domain_kwargs, accessor_domain_kwargs = metrics[
"unexpected_condition"
Expand Down Expand Up @@ -365,19 +298,15 @@ def _spark_column_map_condition_values(

result_format = metric_value_kwargs["result_format"]

# note that without an explicit column alias,
# spark will use only the final portion
# of a nested column as the column name
if result_format["result_format"] == "COMPLETE":
rows = filtered.select(
F.col(column_name).alias(column_name)
).collect() # note that without the explicit alias, spark will use only the final portion of a nested column as the column name # noqa: E501
query = filtered.select(F.col(column_name).alias(column_name)).limit(MAX_RESULT_RECORDS)
else:
rows = (
filtered.select(
F.col(column_name).alias(column_name)
) # note that without the explicit alias, spark will use only the final portion of a nested column as the column name # noqa: E501
.limit(result_format["partial_unexpected_count"])
.collect()
)
return [row[column_name] for row in rows]
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = filtered.select(F.col(column_name).alias(column_name)).limit(limit)
return [row[column_name] for row in query.collect()]


def _spark_column_map_condition_value_counts(
Expand All @@ -387,7 +316,7 @@ def _spark_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[pyspark.Row]:
unexpected_condition, compute_domain_kwargs, accessor_domain_kwargs = metrics[
"unexpected_condition"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_is_sqlalchemy_metric_selectable,
)
from great_expectations.expectations.metrics.util import (
MAX_RESULT_RECORDS,
get_dbms_compatible_metric_domain_kwargs,
)
from great_expectations.util import (
Expand All @@ -40,7 +41,7 @@ def _pandas_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -92,9 +93,10 @@ def _pandas_column_pair_map_condition_values(
)
]
if result_format["result_format"] == "COMPLETE":
return unexpected_list
return unexpected_list[:MAX_RESULT_RECORDS]

return unexpected_list[: result_format["partial_unexpected_count"]]
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
return unexpected_list[:limit]


def _pandas_column_pair_map_condition_filtered_row_count(
Expand All @@ -104,7 +106,7 @@ def _pandas_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> int:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down Expand Up @@ -137,7 +139,7 @@ def _sqlalchemy_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -172,11 +174,12 @@ def _sqlalchemy_column_pair_map_condition_values(

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] != "COMPLETE":
query = query.limit(result_format["partial_unexpected_count"])
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = query.limit(limit)

unexpected_list = [
(val.unexpected_values_A, val.unexpected_values_B)
for val in execution_engine.execute_query(query).fetchall()
for val in execution_engine.execute_query(query).fetchmany(MAX_RESULT_RECORDS)
]
return unexpected_list

Expand All @@ -188,7 +191,7 @@ def _sqlalchemy_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> Any | None:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down Expand Up @@ -216,7 +219,7 @@ def _spark_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
unexpected_condition,
Expand Down Expand Up @@ -249,25 +252,22 @@ def _spark_column_pair_map_condition_values(

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] == "COMPLETE":
rows = filtered.select(
query = filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
).collect()
).limit(MAX_RESULT_RECORDS)
else:
rows = (
filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
)
.limit(result_format["partial_unexpected_count"])
.collect()
)
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
).limit(limit)

unexpected_list = [(row[column_A_name], row[column_B_name]) for row in rows]
unexpected_list = [(row[column_A_name], row[column_B_name]) for row in query.collect()]
return unexpected_list


Expand All @@ -278,7 +278,7 @@ def _spark_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> int:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down
Loading
Loading