Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit data type conversion for univariate drift #404

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions nannyml/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,10 @@ def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_calculate' method")


def _split_features_by_type(data: pd.DataFrame, feature_column_names: List[str]) -> Tuple[List[str], List[str]]:
continuous_column_names = [col for col in feature_column_names if _column_is_continuous(data[col])]
def _split_features_by_type(data: pd.DataFrame, feature_column_names: Iterable[str]) -> Tuple[List[str], List[str]]:
continuous_column_names = [col for col in sorted(feature_column_names) if _column_is_continuous(data[col])]

categorical_column_names = [col for col in feature_column_names if _column_is_categorical(data[col])]
categorical_column_names = [col for col in sorted(feature_column_names) if _column_is_categorical(data[col])]

return continuous_column_names, categorical_column_names

Expand Down
56 changes: 41 additions & 15 deletions nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import warnings
from logging import Logger
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -61,6 +61,7 @@ class UnivariateDriftCalculator(AbstractCalculator):
def __init__(
self,
column_names: Union[str, List[str]],
treat_as_numerical: Optional[Union[str, List[str]]] = None,
treat_as_categorical: Optional[Union[str, List[str]]] = None,
timestamp_column_name: Optional[str] = None,
categorical_methods: Optional[Union[str, List[str]]] = None,
Expand All @@ -79,6 +80,8 @@ def __init__(
column_names: Union[str, List[str]]
A string or list containing the names of features in the provided data set.
A drift score will be calculated for each entry in this list.
treat_as_numerical: Union[str, List[str]]
A single column name or list of column names to be treated as numerical by the calculator.
treat_as_categorical: Union[str, List[str]]
A single column name or list of column names to be treated as categorical by the calculator.
timestamp_column_name: str
Expand Down Expand Up @@ -204,6 +207,12 @@ def __init__(
column_names = [column_names]
self.column_names = column_names

if not treat_as_numerical:
treat_as_numerical = []
if isinstance(treat_as_numerical, str):
treat_as_numerical = [treat_as_numerical]
self.treat_as_numerical = treat_as_numerical

if not treat_as_categorical:
treat_as_categorical = []
if isinstance(treat_as_categorical, str):
Expand Down Expand Up @@ -255,22 +264,10 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDrift

_list_missing(self.column_names, reference_data)

self.continuous_column_names, self.categorical_column_names = _split_features_by_type(
reference_data, self.column_names
self.continuous_column_names, self.categorical_column_names = self._split_continuous_and_categorical(
reference_data
)

for column_name in self.treat_as_categorical:
if column_name not in self.column_names:
self._logger.info(
f"ignoring 'treat_as_categorical' value '{column_name}' because it was not in "
f"listed column names"
)
break
if column_name in self.continuous_column_names:
self.continuous_column_names.remove(column_name)
if column_name not in self.categorical_column_names:
self.categorical_column_names.append(column_name)

timestamps = reference_data[self.timestamp_column_name] if self.timestamp_column_name else None
for column_name in self.continuous_column_names:
methods = []
Expand Down Expand Up @@ -399,6 +396,35 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:

return self.result

def _split_continuous_and_categorical(self, data: pd.DataFrame) -> Tuple[List[str], List[str]]:
"""Splits the features in the data set into continuous and categorical features."""
treat_as_numerical_set, treat_as_categorical_set = set(self.treat_as_numerical), set(self.treat_as_categorical)
column_names_set = set(self.column_names)

invalid_continuous_column_names = treat_as_numerical_set - column_names_set
treat_as_numerical_set = treat_as_numerical_set - invalid_continuous_column_names
if invalid_continuous_column_names:
self._logger.info(
f"ignoring 'treat_as_numerical' values {list(invalid_continuous_column_names)} because "
f"they were not in listed column names"
)

invalid_categorical_column_names = treat_as_categorical_set - column_names_set
treat_as_categorical_set = treat_as_categorical_set - invalid_categorical_column_names
if invalid_categorical_column_names:
self._logger.info(
f"ignoring 'treat_as_categorical' values {list(invalid_categorical_column_names)} because "
f"they were not in listed column names"
)

unspecified_columns = column_names_set - treat_as_numerical_set - treat_as_categorical_set
continuous_column_names, categorical_column_names = _split_features_by_type(data, unspecified_columns)

continuous_column_names = continuous_column_names + list(treat_as_numerical_set)
categorical_column_names = categorical_column_names + list(treat_as_categorical_set)

return continuous_column_names, categorical_column_names


def _calculate_for_column(
data: pd.DataFrame, column_name: str, method: Method, logger: Optional[Logger] = None
Expand Down
195 changes: 128 additions & 67 deletions nannyml/drift/univariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from scipy.stats import chi2_contingency, ks_2samp, wasserstein_distance

from nannyml._typing import Self
from nannyml.base import _column_is_categorical, _remove_nans
from nannyml.base import _remove_nans
from nannyml.chunk import Chunker
from nannyml.exceptions import InvalidArgumentsException, NotFittedException
from nannyml.thresholds import Threshold, calculate_threshold_values
Expand Down Expand Up @@ -247,8 +247,7 @@


@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL)
class JensenShannonDistance(Method):
class ContinuousJensenShannonDistance(Method):
"""Calculates Jensen-Shannon distance.

By default an alert will be raised if `distance > 0.1`.
Expand All @@ -272,34 +271,17 @@
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""
self._treat_as_type: str
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None):
reference_data = _remove_nans(reference_data)
if _column_is_categorical(reference_data):
treat_as_type = 'cat'
else:
n_unique_values = len(np.unique(reference_data))
len_reference = len(reference_data)
if n_unique_values > 50 or n_unique_values / len_reference > 0.1:
treat_as_type = 'cont'
else:
treat_as_type = 'cat'
len_reference = len(reference_data)

if treat_as_type == 'cont':
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins
else:
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

self._treat_as_type = treat_as_type
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins

return self

Expand All @@ -308,15 +290,9 @@
data = _remove_nans(data)
if data.empty:
return np.nan
if self._treat_as_type == 'cont':
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

else:
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
Expand All @@ -325,7 +301,63 @@

distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2)

del reference_proba_in_bins
return distance


@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL)
class CategoricalJensenShannonDistance(Method):
"""Calculates Jensen-Shannon distance.

By default an alert will be raised if `distance > 0.1`.
"""

def __init__(self, **kwargs) -> None:
"""Initialize Jensen-Shannon method."""
super().__init__(
display_name='Jensen-Shannon distance',
column_name='jensen_shannon',
lower_threshold_limit=0,
**kwargs,
)
"""
Parameters
----------
display_name : str, default='Jensen-Shannon distance'
The name of the metric. Used to display in plots.
column_name: str, default='jensen-shannon'
The name used to indicate the metric in columns of a DataFrame.
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None):
reference_data = _remove_nans(reference_data)
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

return self

def _calculate(self, data: pd.Series):
reference_proba_in_bins = copy(self._reference_proba_in_bins)
data = _remove_nans(data)
if data.empty:
return np.nan

Check warning on line 348 in nannyml/drift/univariate/methods.py

View check run for this annotation

Codecov / codecov/patch

nannyml/drift/univariate/methods.py#L348

Added line #L348 was not covered by tests

data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
data_proba_in_bins = np.append(data_proba_in_bins, leftover)
reference_proba_in_bins = np.append(reference_proba_in_bins, 0)

distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2)

return distance

Expand Down Expand Up @@ -670,8 +702,7 @@


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class HellingerDistance(Method):
class ContinuousHellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
Expand All @@ -693,34 +724,70 @@
An optional lower threshold for the performance metric.
"""

self._treat_as_type: str
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
reference_data = _remove_nans(reference_data)
if _column_is_categorical(reference_data):
treat_as_type = 'cat'
else:
n_unique_values = len(np.unique(reference_data))
len_reference = len(reference_data)
if n_unique_values > 50 or n_unique_values / len_reference > 0.1:
treat_as_type = 'cont'
else:
treat_as_type = 'cat'
len_reference = len(reference_data)

if treat_as_type == 'cont':
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins
else:
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins

return self

self._treat_as_type = treat_as_type
def _calculate(self, data: pd.Series):
data = _remove_nans(data)
if data.empty:
return np.nan

Check warning on line 744 in nannyml/drift/univariate/methods.py

View check run for this annotation

Codecov / codecov/patch

nannyml/drift/univariate/methods.py#L744

Added line #L744 was not covered by tests
reference_proba_in_bins = copy(self._reference_proba_in_bins)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
data_proba_in_bins = np.append(data_proba_in_bins, leftover)
reference_proba_in_bins = np.append(reference_proba_in_bins, 0)

distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2)

return distance


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class CategoricalHellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
"""Initialize Hellinger Distance method."""
super().__init__(
display_name='Hellinger distance',
column_name='hellinger',
lower_threshold_limit=0,
**kwargs,
)
"""
Parameters
----------
display_name : str, default='Hellinger distance'
The name of the metric. Used to display in plots.
column_name: str, default='hellinger'
The name used to indicate the metric in columns of a DataFrame.
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""

self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
reference_data = _remove_nans(reference_data)

reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

return self

Expand All @@ -729,15 +796,11 @@
if data.empty:
return np.nan
reference_proba_in_bins = copy(self._reference_proba_in_bins)
if self._treat_as_type == 'cont':
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

else:
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
Expand All @@ -746,6 +809,4 @@

distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2)

del reference_proba_in_bins

return distance
Loading
Loading