Skip to content

Commit

Permalink
[MAINTENANCE] Refactor ColumnDescriptiveMetricsMetricRetriever to p…
Browse files Browse the repository at this point in the history
…arent class (0.18.x) (#9612)
  • Loading branch information
Shinnnyshinshin authored Mar 12, 2024
1 parent 07d2328 commit aca1e7c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"File: great_expectations/expectations/regex_based_column_map_expectation.py Name: register_metric",
"File: great_expectations/expectations/set_based_column_map_expectation.py Name: register_metric",
"File: great_expectations/expectations/set_based_column_map_expectation.py Name: validate_configuration",
"File: great_expectations/experimental/metric_repository/metric_retriever.py Name: get_validator",
"File: great_expectations/experimental/datasource/fabric.py Name: build_batch_request",
"File: great_expectations/experimental/datasource/fabric.py Name: get_batch_list_from_batch_request",
"File: great_expectations/profile/base.py Name: validate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,14 @@
from typing import TYPE_CHECKING, Any, List, Sequence

from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.domain import SemanticDomainTypes
from great_expectations.datasource.fluent.interfaces import Batch
from great_expectations.experimental.metric_repository.metric_retriever import (
MetricRetriever,
)
from great_expectations.experimental.metric_repository.metrics import (
ColumnMetric,
Metric,
MetricException,
TableMetric,
)
from great_expectations.rule_based_profiler.domain_builder import ColumnDomainBuilder
from great_expectations.validator.exception_info import ExceptionInfo
from great_expectations.validator.metric_configuration import MetricConfiguration

if TYPE_CHECKING:
from great_expectations.data_context import AbstractDataContext
Expand All @@ -27,20 +21,13 @@
_MetricKey,
_MetricsDict,
)
from great_expectations.validator.validator import Validator


class ColumnDescriptiveMetricsMetricRetriever(MetricRetriever):
"""Compute and retrieve Column Descriptive Metrics for a batch of data."""

def __init__(self, context: AbstractDataContext):
super().__init__(context=context)
self._validator: Validator | None = None

def get_validator(self, batch_request: BatchRequest) -> Validator:
if self._validator is None:
self._validator = self._context.get_validator(batch_request=batch_request)
return self._validator

@override
def get_metrics(self, batch_request: BatchRequest) -> Sequence[Metric]:
Expand Down Expand Up @@ -171,52 +158,6 @@ def _get_table_column_types(
exception=exception,
)

def _get_columns_to_exclude(self, table_column_types: Metric) -> List[str]:
columns_to_skip: List[str] = []
for column_type in table_column_types.value:
if not column_type.get("type"):
columns_to_skip.append(column_type["name"])
return columns_to_skip

def _get_column_metrics(
self,
batch_request: BatchRequest,
column_list: List[str],
column_metric_names: List[str],
column_metric_type: type[ColumnMetric[Any]],
) -> Sequence[Metric]:
column_metric_configs = self._generate_column_metric_configurations(
column_list, column_metric_names
)
batch_id, computed_metrics, aborted_metrics = self._compute_metrics(
batch_request, column_metric_configs
)

# Convert computed_metrics
metrics: list[Metric] = []
metric_lookup_key: _MetricKey

for metric_name in column_metric_names:
for column in column_list:
metric_lookup_key = (metric_name, f"column={column}", tuple())
value, exception = self._get_metric_from_computed_metrics(
metric_name=metric_name,
metric_lookup_key=metric_lookup_key,
computed_metrics=computed_metrics,
aborted_metrics=aborted_metrics,
)
metrics.append(
column_metric_type(
batch_id=batch_id,
metric_name=metric_name,
column=column,
value=value,
exception=exception,
)
)

return metrics

def _get_numeric_column_metrics(
self, batch_request: BatchRequest, column_list: List[str]
) -> Sequence[Metric]:
Expand Down Expand Up @@ -290,137 +231,3 @@ def _get_non_numeric_column_metrics(
)

return metrics

def _get_all_column_names(self, metrics: Sequence[Metric]) -> List[str]:
column_list: List[str] = []
for metric in metrics:
if metric.metric_name == "table.columns":
column_list = metric.value
return column_list

def _get_numeric_column_names(
self,
batch_request: BatchRequest,
exclude_column_names: List[str],
) -> list[str]:
"""Get the names of all numeric columns in the batch."""
return self._get_column_names_for_semantic_types(
batch_request=batch_request,
include_semantic_types=[SemanticDomainTypes.NUMERIC],
exclude_column_names=exclude_column_names,
)

def _get_timestamp_column_names(
self,
batch_request: BatchRequest,
exclude_column_names: List[str],
) -> list[str]:
"""Get the names of all timestamp columns in the batch."""
return self._get_column_names_for_semantic_types(
batch_request=batch_request,
include_semantic_types=[SemanticDomainTypes.DATETIME],
exclude_column_names=exclude_column_names,
)

def _get_column_names_for_semantic_types(
self,
batch_request: BatchRequest,
include_semantic_types: List[SemanticDomainTypes],
exclude_column_names: List[str],
) -> list[str]:
"""Get the names of all columns matching semantic types in the batch."""
validator = self.get_validator(batch_request=batch_request)
domain_builder = ColumnDomainBuilder(
include_semantic_types=include_semantic_types, # type: ignore[arg-type] # ColumnDomainBuilder supports other ways of specifying semantic types
exclude_column_names=exclude_column_names,
)
assert isinstance(
validator.active_batch, Batch
), f"validator.active_batch is type {type(validator.active_batch).__name__} instead of type {Batch.__name__}"
batch_id = validator.active_batch.id
column_names = domain_builder.get_effective_column_names(
validator=validator,
batch_ids=[batch_id],
)
return column_names

def _generate_table_metric_configurations(
self, table_metric_names: list[str]
) -> list[MetricConfiguration]:
table_metric_configs = [
MetricConfiguration(
metric_name=metric_name, metric_domain_kwargs={}, metric_value_kwargs={}
)
for metric_name in table_metric_names
]
return table_metric_configs

def _generate_column_metric_configurations(
self, column_list: list[str], column_metric_names: list[str]
) -> list[MetricConfiguration]:
column_metric_configs: List[MetricConfiguration] = list()
for metric_name in column_metric_names:
for column in column_list:
column_metric_configs.append(
MetricConfiguration(
metric_name=metric_name,
metric_domain_kwargs={"column": column},
metric_value_kwargs={},
)
)
return column_metric_configs

def _compute_metrics(
self, batch_request: BatchRequest, metric_configs: list[MetricConfiguration]
) -> tuple[str, _MetricsDict, _AbortedMetricsInfoDict]:
validator = self.get_validator(batch_request=batch_request)
# The runtime configuration catch_exceptions is explicitly set to True to catch exceptions
# that are thrown when computing metrics. This is so we can capture the error for later
# surfacing, and not have the entire metric run fail so that other metrics will still be
# computed.
(
computed_metrics,
aborted_metrics,
) = validator.compute_metrics(
metric_configurations=metric_configs,
runtime_configuration={"catch_exceptions": True},
)
assert isinstance(
validator.active_batch, Batch
), f"validator.active_batch is type {type(validator.active_batch).__name__} instead of type {Batch.__name__}"
batch_id = validator.active_batch.id
return batch_id, computed_metrics, aborted_metrics

def _get_metric_from_computed_metrics(
self,
metric_name: str,
computed_metrics: _MetricsDict,
aborted_metrics: _AbortedMetricsInfoDict,
metric_lookup_key: _MetricKey | None = None,
) -> tuple[Any, MetricException | None]:
if metric_lookup_key is None:
metric_lookup_key = (
metric_name,
tuple(),
tuple(),
)
value = None
metric_exception = None
if metric_lookup_key in computed_metrics:
value = computed_metrics[metric_lookup_key]
elif metric_lookup_key in aborted_metrics:
exception = aborted_metrics[metric_lookup_key]
exception_info = exception["exception_info"]
exception_type = "Unknown" # Note: we currently only capture the message and traceback, not the type
if isinstance(exception_info, ExceptionInfo):
exception_message = exception_info.exception_message
metric_exception = MetricException(
type=exception_type, message=exception_message
)
else:
metric_exception = MetricException(
type="Not found",
message="Metric was not successfully computed but exception was not found.",
)

return value, metric_exception
Loading

0 comments on commit aca1e7c

Please sign in to comment.