Skip to content

Commit

Permalink
[MAINTENANCE] Limit Result Format and QueryMetricProvider total une…
Browse files Browse the repository at this point in the history
…xpected records (#10432)
  • Loading branch information
NathanFarmer authored Oct 2, 2024
1 parent 08dffb2 commit 09bdbcc
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 326 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
The following table lists the valid keys for a Result Format dictionary and what their purpose is. Not all keys are used by every verbosity level.

| Dictionary key | Purpose |
| --- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`"result_format"` | Sets the fields to return in Validation Results. Valid values are `"BASIC"`, `"BOOLEAN_ONLY"`, `"COMPLETE"`, and `"SUMMARY"`. The default value is `"SUMMARY"`. |
| `"unexpected_index_column_names"` | Defines the columns that can be used to identify unexpected results. For example, primary key (PK) column(s) or other columns with unique identifiers. Supports multiple column names as a list. |
| Dictionary key | Purpose |
| --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`"result_format"` | Sets the fields to return in Validation Results. Valid values are `"BASIC"`, `"BOOLEAN_ONLY"`, `"COMPLETE"`, and `"SUMMARY"`. The default value is `"SUMMARY"`. |
| `"unexpected_index_column_names"` | Defines the columns that can be used to identify unexpected results. For example, primary key (PK) column(s) or other columns with unique identifiers. Supports multiple column names as a list. |
|`"return_unexpected_index_query"` | When running validations, a query (or a set of indices) is returned that allows you to retrieve the full set of unexpected results as well as the values of the identifying columns specified in `"unexpected_index_column_names"`. Setting this value to `False` suppresses the output (default is `True`). |
| `"partial_unexpected_count"` | Sets the number of results to include in `"partial_unexpected_counts"`, `"partial_unexpected_list"`, and `"partial_unexpected_index_list"` if applicable. Set the value to zero to suppress the unexpected counts. |
| `"exclude_unexpected_values"` | When running validations, a set of unexpected results' indices and values is returned. Setting this value to `True` suppresses values from the output to only have indices (default is `False`). |
| `"include_unexpected_rows"` | When `True` this returns the entire row for each unexpected value in dictionary form. This setting only applies when `"result_format"` has been explicitly set to a value other than `"BOOLEAN_ONLY"`. |

:::note
`include_unexpected_rows` returns EVERY row for each unexpected value. In large tables, this could result in an unmanageable amount of data.
:::
| `"partial_unexpected_count"` | Sets the number of results to include in `"partial_unexpected_counts"`, `"partial_unexpected_list"`, and `"partial_unexpected_index_list"` if applicable. Set the value to zero to suppress the unexpected counts. |
| `"exclude_unexpected_values"` | When running validations, a set of unexpected results' indices and values is returned. Setting this value to `True` suppresses values from the output to only have indices (default is `False`). |
| `"include_unexpected_rows"` | When `True` this returns up to 200 entire rows for each unexpected value in dictionary form. This setting only applies when `"result_format"` has been explicitly set to a value other than `"BOOLEAN_ONLY"`. |
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ The following table lists the fields that can be found in the `result` dictionar
| partial_unexpected_counts | A partial list of values and counts, showing the number of times each of the unexpected values occur. (Up to 20 unexpected value/count pairs by default.) |
| unexpected_index_list | A list of the indices of the unexpected values in the column, as defined by the columns in `unexpected_index_column_names`. |
| unexpected_index_query | A query that can be used to retrieve all unexpected values (SQL and Spark), or the full list of unexpected indices (Pandas). |
| unexpected_list | A list of all values that violate the Expectation. |
| unexpected_list | A list of up to 200 values that violate the Expectation. |
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
TYPE_CHECKING,
Any,
Dict,
Sequence,
Tuple,
Union,
)

import great_expectations.exceptions as gx_exceptions
from great_expectations.expectations.metrics.util import (
MAX_RESULT_RECORDS,
get_dbms_compatible_metric_domain_kwargs,
)

Expand All @@ -29,6 +31,8 @@
)

if TYPE_CHECKING:
import pandas as pd

from great_expectations.compatibility import pyspark, sqlalchemy


Expand All @@ -42,7 +46,7 @@ def _pandas_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[dict]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -86,84 +90,10 @@ def _pandas_column_map_condition_values(
result_format = metric_value_kwargs["result_format"]

if result_format["result_format"] == "COMPLETE":
return list(domain_values)

return list(domain_values[: result_format["partial_unexpected_count"]])
return list(domain_values[:MAX_RESULT_RECORDS])


# TODO: <Alex>11/15/2022: Please DO_NOT_DELETE this method (even though it is not currently utilized). Thanks.</Alex> # noqa: E501
def _pandas_column_map_series_and_domain_values(
cls,
execution_engine: PandasExecutionEngine,
metric_domain_kwargs: dict,
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics["unexpected_condition"]
(
map_series,
compute_domain_kwargs_2,
accessor_domain_kwargs_2,
) = metrics["metric_partial_fn"]
assert (
compute_domain_kwargs == compute_domain_kwargs_2
), "map_series and condition must have the same compute domain"
assert (
accessor_domain_kwargs == accessor_domain_kwargs_2
), "map_series and condition must have the same accessor kwargs"

if "column" not in accessor_domain_kwargs:
raise ValueError( # noqa: TRY003
"""No "column" found in provided metric_domain_kwargs, but it is required for a column map metric
(_pandas_column_map_series_and_domain_values).
""" # noqa: E501
)

accessor_domain_kwargs = get_dbms_compatible_metric_domain_kwargs(
metric_domain_kwargs=accessor_domain_kwargs,
batch_columns_list=metrics["table.columns"],
)

column_name: Union[str, sqlalchemy.quoted_name] = accessor_domain_kwargs["column"]

df = execution_engine.get_domain_records(domain_kwargs=compute_domain_kwargs)

###
# NOTE: 20201111 - JPC - in the map_series / map_condition_series world (pandas), we
# currently handle filter_column_isnull differently than other map_fn / map_condition
# cases.
###
filter_column_isnull = kwargs.get(
"filter_column_isnull", getattr(cls, "filter_column_isnull", False)
)
if filter_column_isnull:
df = df[df[column_name].notnull()]

domain_values = df[column_name]

domain_values = domain_values[
boolean_mapped_unexpected_values == True # noqa: E712
]
map_series = map_series[boolean_mapped_unexpected_values == True] # noqa: E712

result_format = metric_value_kwargs["result_format"]

if result_format["result_format"] == "COMPLETE":
return (
list(domain_values),
list(map_series),
)

return (
list(domain_values[: result_format["partial_unexpected_count"]]),
list(map_series[: result_format["partial_unexpected_count"]]),
)
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
return list(domain_values[:limit])


def _pandas_column_map_condition_value_counts(
Expand All @@ -173,7 +103,7 @@ def _pandas_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> pd.Series[int]:
"""Returns respective value counts for distinct column values"""
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -238,7 +168,7 @@ def _sqlalchemy_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Tuple],
**kwargs,
):
) -> list[dict]:
"""
Particularly for the purpose of finding unexpected values, returns all the metric values which do not meet an
expected Expectation condition for ColumnMapExpectation Expectations.
Expand Down Expand Up @@ -284,7 +214,10 @@ def _sqlalchemy_column_map_condition_values(
)
query = query.limit(10000) # BigQuery upper bound on query parameters

return [val.unexpected_values for val in execution_engine.execute_query(query).fetchall()]
return [
val.unexpected_values
for val in execution_engine.execute_query(query).fetchmany(MAX_RESULT_RECORDS)
]


def _sqlalchemy_column_map_condition_value_counts(
Expand All @@ -294,7 +227,7 @@ def _sqlalchemy_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> Union[Sequence[sa.Row[Any]], Any]:
"""
Returns value counts for all the metric values which do not meet an expected Expectation condition for instances
of ColumnMapExpectation.
Expand Down Expand Up @@ -335,7 +268,7 @@ def _spark_column_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[dict]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
unexpected_condition, compute_domain_kwargs, accessor_domain_kwargs = metrics[
"unexpected_condition"
Expand Down Expand Up @@ -365,19 +298,15 @@ def _spark_column_map_condition_values(

result_format = metric_value_kwargs["result_format"]

# note that without an explicit column alias,
# spark will use only the final portion
# of a nested column as the column name
if result_format["result_format"] == "COMPLETE":
rows = filtered.select(
F.col(column_name).alias(column_name)
).collect() # note that without the explicit alias, spark will use only the final portion of a nested column as the column name # noqa: E501
query = filtered.select(F.col(column_name).alias(column_name)).limit(MAX_RESULT_RECORDS)
else:
rows = (
filtered.select(
F.col(column_name).alias(column_name)
) # note that without the explicit alias, spark will use only the final portion of a nested column as the column name # noqa: E501
.limit(result_format["partial_unexpected_count"])
.collect()
)
return [row[column_name] for row in rows]
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = filtered.select(F.col(column_name).alias(column_name)).limit(limit)
return [row[column_name] for row in query.collect()]


def _spark_column_map_condition_value_counts(
Expand All @@ -387,7 +316,7 @@ def _spark_column_map_condition_value_counts(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[pyspark.Row]:
unexpected_condition, compute_domain_kwargs, accessor_domain_kwargs = metrics[
"unexpected_condition"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_is_sqlalchemy_metric_selectable,
)
from great_expectations.expectations.metrics.util import (
MAX_RESULT_RECORDS,
get_dbms_compatible_metric_domain_kwargs,
)
from great_expectations.util import (
Expand All @@ -40,7 +41,7 @@ def _pandas_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -92,9 +93,10 @@ def _pandas_column_pair_map_condition_values(
)
]
if result_format["result_format"] == "COMPLETE":
return unexpected_list
return unexpected_list[:MAX_RESULT_RECORDS]

return unexpected_list[: result_format["partial_unexpected_count"]]
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
return unexpected_list[:limit]


def _pandas_column_pair_map_condition_filtered_row_count(
Expand All @@ -104,7 +106,7 @@ def _pandas_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> int:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down Expand Up @@ -137,7 +139,7 @@ def _sqlalchemy_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
boolean_mapped_unexpected_values,
Expand Down Expand Up @@ -172,11 +174,12 @@ def _sqlalchemy_column_pair_map_condition_values(

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] != "COMPLETE":
query = query.limit(result_format["partial_unexpected_count"])
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = query.limit(limit)

unexpected_list = [
(val.unexpected_values_A, val.unexpected_values_B)
for val in execution_engine.execute_query(query).fetchall()
for val in execution_engine.execute_query(query).fetchmany(MAX_RESULT_RECORDS)
]
return unexpected_list

Expand All @@ -188,7 +191,7 @@ def _sqlalchemy_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> Any | None:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down Expand Up @@ -216,7 +219,7 @@ def _spark_column_pair_map_condition_values(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> list[tuple[Any, Any]]:
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
(
unexpected_condition,
Expand Down Expand Up @@ -249,25 +252,22 @@ def _spark_column_pair_map_condition_values(

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] == "COMPLETE":
rows = filtered.select(
query = filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
).collect()
).limit(MAX_RESULT_RECORDS)
else:
rows = (
filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
)
.limit(result_format["partial_unexpected_count"])
.collect()
)
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
query = filtered.select(
[
F.col(column_A_name).alias(column_A_name),
F.col(column_B_name).alias(column_B_name),
]
).limit(limit)

unexpected_list = [(row[column_A_name], row[column_B_name]) for row in rows]
unexpected_list = [(row[column_A_name], row[column_B_name]) for row in query.collect()]
return unexpected_list


Expand All @@ -278,7 +278,7 @@ def _spark_column_pair_map_condition_filtered_row_count(
metric_value_kwargs: dict,
metrics: Dict[str, Any],
**kwargs,
):
) -> int:
"""Return record counts from the specified domain that match the map-style metric in the metrics dictionary.""" # noqa: E501
_, compute_domain_kwargs, accessor_domain_kwargs = metrics["unexpected_condition"]

Expand Down
Loading

0 comments on commit 09bdbcc

Please sign in to comment.