Skip to content

Commit

Permalink
fix telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-joshi committed Aug 29, 2024
1 parent 36f06f6 commit 74594d9
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 166 deletions.
72 changes: 70 additions & 2 deletions src/snowflake/snowpark/modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@
timedelta_range,
)

import modin.pandas

# TODO: SNOW-851745 make sure add all Snowpark pandas API general functions
from modin.pandas import plotting # type: ignore[import]

from snowflake.snowpark.modin.pandas.dataframe import DataFrame
from snowflake.snowpark.modin.pandas.api.extensions import (
register_dataframe_accessor,
register_series_accessor,
)
from snowflake.snowpark.modin.pandas.dataframe import _DATAFRAME_EXTENSIONS_, DataFrame
from snowflake.snowpark.modin.pandas.general import (
concat,
crosstab,
Expand Down Expand Up @@ -140,8 +146,11 @@
read_xml,
to_pickle,
)
from snowflake.snowpark.modin.pandas.series import Series
from snowflake.snowpark.modin.pandas.series import _SERIES_EXTENSIONS_, Series
from snowflake.snowpark.modin.plugin._internal.session import SnowpandasSessionHolder
from snowflake.snowpark.modin.plugin._internal.telemetry import (
try_add_telemetry_to_attribute,
)

# The extensions assigned to this module
_PD_EXTENSIONS_: dict = {}
Expand All @@ -154,12 +163,71 @@
DatetimeIndex,
TimedeltaIndex,
)

# this must occur before overrides are applied
_attrs_defined_on_modin_base = set(dir(modin.pandas.base.BasePandasDataset))
_attrs_defined_on_series = set(
dir(Series)
) # TODO: SNOW-1063347 revisit when series.py is removed
_attrs_defined_on_dataframe = set(
dir(DataFrame)
) # TODO: SNOW-1063346 revisit when dataframe.py is removed

# base overrides occur before subclass overrides in case subclasses override a base method
import snowflake.snowpark.modin.plugin.extensions.base_extensions # isort: skip # noqa: E402,F401
import snowflake.snowpark.modin.plugin.extensions.base_overrides # isort: skip # noqa: E402,F401
import snowflake.snowpark.modin.plugin.extensions.dataframe_extensions # isort: skip # noqa: E402,F401
import snowflake.snowpark.modin.plugin.extensions.dataframe_overrides # isort: skip # noqa: E402,F401
import snowflake.snowpark.modin.plugin.extensions.series_extensions # isort: skip # noqa: E402,F401
import snowflake.snowpark.modin.plugin.extensions.series_overrides # isort: skip # noqa: E402,F401

# For any method defined on Series/DF, add telemetry to it if it meets all of the following conditions:
# 1. The method was defined directly on upstream BasePandasDataset (_attrs_defined_on_modin_base)
# 2. The method is not overridden by a child class (this will change)
# 3. The method is not overridden by an extensions module
# 4. The method name does not start with an _
#
# TODO: SNOW-1063347
# Since we still use the vendored version of Series and the overrides for the top-level
# namespace haven't been performed yet, we need to set properties on the vendored version
_base_telemetry_added_attrs = set()

_series_ext = _SERIES_EXTENSIONS_.copy()
for attr_name in dir(Series):
if (
attr_name in _attrs_defined_on_modin_base
and attr_name in _attrs_defined_on_series
and attr_name not in _series_ext
and not attr_name.startswith("_")
):
register_series_accessor(attr_name)(
try_add_telemetry_to_attribute(attr_name, getattr(Series, attr_name))
)
_base_telemetry_added_attrs.add(attr_name)

# TODO: SNOW-1063346
# Since we still use the vendored version of DataFrame and the overrides for the top-level
# namespace haven't been performed yet, we need to set properties on the vendored version
_dataframe_ext = _DATAFRAME_EXTENSIONS_.copy()
for attr_name in dir(DataFrame):
if (
attr_name in _attrs_defined_on_modin_base
and attr_name in _attrs_defined_on_dataframe
and attr_name not in _dataframe_ext
and not attr_name.startswith("_")
):
# If telemetry was already added via Series, register the override but don't re-wrap
# the method in the telemetry annotation. If we don't do this check, we will end up
# double-reporting telemetry on some methods.
original_attr = getattr(DataFrame, attr_name)
new_attr = (
original_attr
if attr_name in _base_telemetry_added_attrs
else try_add_telemetry_to_attribute(attr_name, original_attr)
)
register_dataframe_accessor(attr_name)(new_attr)
_base_telemetry_added_attrs.add(attr_name)


def __getattr__(name: str) -> Any:
"""
Expand Down
103 changes: 46 additions & 57 deletions src/snowflake/snowpark/modin/plugin/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from enum import Enum, unique
from typing import Any, Callable, Optional, TypeVar, Union, cast

import modin
from typing_extensions import ParamSpec

import snowflake.snowpark.session
Expand Down Expand Up @@ -496,6 +495,49 @@ def wrap(*args, **kwargs): # type: ignore
}


def try_add_telemetry_to_attribute(attr_name: str, attr_value: Any) -> Any:
"""
Attempts to add telemetry to an attribute.
If the attribute is callable with name in TELEMETRY_PRIVATE_METHODS, or is a callable that
starts with an underscore, the original attribute will be returned as-is. Otherwise, a version
of the method/property annotated with Snowpark pandas telemetry is returned.
"""
if callable(attr_value) and (
not attr_name.startswith("_") or (attr_name in TELEMETRY_PRIVATE_METHODS)
):
return snowpark_pandas_telemetry_method_decorator(attr_value)
elif isinstance(attr_value, property):
# wrap on getter and setter
return property(
snowpark_pandas_telemetry_method_decorator(
cast(
# add a cast because mypy doesn't recognize that
# non-None fget and __get__ are both callable
# arguments to snowpark_pandas_telemetry_method_decorator.
Callable,
attr_value.__get__ # pragma: no cover: we don't encounter this case in pandas or modin because every property has an fget method.
if attr_value.fget is None
else attr_value.fget,
),
property_name=attr_name,
property_method_type=PropertyMethodType.FGET,
),
snowpark_pandas_telemetry_method_decorator(
attr_value.__set__ if attr_value.fset is None else attr_value.fset,
property_name=attr_name,
property_method_type=PropertyMethodType.FSET,
),
snowpark_pandas_telemetry_method_decorator(
attr_value.__delete__ if attr_value.fdel is None else attr_value.fdel,
property_name=attr_name,
property_method_type=PropertyMethodType.FDEL,
),
doc=attr_value.__doc__,
)
return attr_value


class TelemetryMeta(type):
def __new__(
cls, name: str, bases: tuple, attrs: dict[str, Any]
Expand Down Expand Up @@ -536,59 +578,6 @@ def __new__(
snowflake.snowpark.modin.pandas.window.Rolling]:
The modified class with decorated methods.
"""
attr_dict = dict(attrs.items())
# If BasePandasDataset, defined exclusively by upstream modin, is a parent of this class,
# then apply the telemetry decorator to it.
# https://stackoverflow.com/a/71105206
# TODO figure out solution for dataframe/series when those directly use modin frontend
for base in bases:
if base is modin.pandas.base.BasePandasDataset:
# Newly defined attrs should take precedence over those defined in base,
# so the keys in attr_dict should overwrite those in base_dict
base_dict = dict(vars(base).items())
base_dict.update(attr_dict)
attr_dict = base_dict
new_attrs = {}
for attr_name, attr_value in attr_dict.items():
if callable(attr_value) and (
not attr_name.startswith("_")
or (attr_name in TELEMETRY_PRIVATE_METHODS)
):
new_attrs[attr_name] = snowpark_pandas_telemetry_method_decorator(
attr_value
)
elif isinstance(attr_value, property):
# wrap on getter and setter
new_attrs[attr_name] = property(
snowpark_pandas_telemetry_method_decorator(
cast(
# add a cast because mypy doesn't recognize that
# non-None fget and __get__ are both callable
# arguments to snowpark_pandas_telemetry_method_decorator.
Callable,
attr_value.__get__ # pragma: no cover: we don't encounter this case in pandas or modin because every property has an fget method.
if attr_value.fget is None
else attr_value.fget,
),
property_name=attr_name,
property_method_type=PropertyMethodType.FGET,
),
snowpark_pandas_telemetry_method_decorator(
attr_value.__set__
if attr_value.fset is None
else attr_value.fset,
property_name=attr_name,
property_method_type=PropertyMethodType.FSET,
),
snowpark_pandas_telemetry_method_decorator(
attr_value.__delete__
if attr_value.fdel is None
else attr_value.fdel,
property_name=attr_name,
property_method_type=PropertyMethodType.FDEL,
),
doc=attr_value.__doc__,
)
else:
new_attrs[attr_name] = attr_value
return type.__new__(cls, name, bases, new_attrs)
for attr_name, attr_value in attrs.items():
attrs[attr_name] = try_add_telemetry_to_attribute(attr_name, attr_value)
return type.__new__(cls, name, bases, attrs)
46 changes: 46 additions & 0 deletions src/snowflake/snowpark/modin/plugin/extensions/base_extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
File containing BasePandasDataset APIs defined in Snowpark pandas but not the Modin API layer.
"""

from snowflake.snowpark.modin.plugin._internal.telemetry import (
snowpark_pandas_telemetry_method_decorator,
)

from .base_overrides import register_base_override


@register_base_override("__array_function__")
@snowpark_pandas_telemetry_method_decorator
def __array_function__(self, func: callable, types: tuple, args: tuple, kwargs: dict):
"""
Apply the `func` to the `BasePandasDataset`.
Parameters
----------
func : np.func
The NumPy func to apply.
types : tuple
The types of the args.
args : tuple
The args to the func.
kwargs : dict
Additional keyword arguments.
Returns
-------
BasePandasDataset
The result of the ufunc applied to the `BasePandasDataset`.
"""
from snowflake.snowpark.modin.plugin.utils.numpy_to_pandas import (
numpy_to_pandas_func_map,
)

if func.__name__ in numpy_to_pandas_func_map:
return numpy_to_pandas_func_map[func.__name__](*args, **kwargs)
else:
# per NEP18 we raise NotImplementedError so that numpy can intercept
return NotImplemented # pragma: no cover
44 changes: 3 additions & 41 deletions src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pickle as pkl
import warnings
from collections.abc import Sequence
from typing import Any, Callable, Hashable, Literal, Mapping, cast, get_args
from typing import Any, Callable, Hashable, Literal, Mapping, get_args

import modin.pandas as pd
import numpy as np
Expand Down Expand Up @@ -73,9 +73,8 @@
validate_and_try_convert_agg_func_arg_func_to_str,
)
from snowflake.snowpark.modin.plugin._internal.telemetry import (
TELEMETRY_PRIVATE_METHODS,
PropertyMethodType,
snowpark_pandas_telemetry_method_decorator,
try_add_telemetry_to_attribute,
)
from snowflake.snowpark.modin.plugin._typing import ListLike
from snowflake.snowpark.modin.plugin.utils.error_message import (
Expand All @@ -97,44 +96,7 @@ def register_base_override(method_name: str):
"""

def decorator(base_method: Any):
if callable(base_method) and (
not method_name.startswith("_")
or (method_name in TELEMETRY_PRIVATE_METHODS)
):
base_method = snowpark_pandas_telemetry_method_decorator(base_method)
elif isinstance(base_method, property):
base_method = property(
snowpark_pandas_telemetry_method_decorator(
cast(
# add a cast because mypy doesn't recognize that
# non-None fget and __get__ are both callable
# arguments to snowpark_pandas_telemetry_method_decorator.
Callable,
base_method.fget, # all properties defined in this file have an fget
),
property_name=method_name,
property_method_type=PropertyMethodType.FGET,
),
snowpark_pandas_telemetry_method_decorator(
(
base_method.__set__
if base_method.fset is None
else base_method.fset
),
property_name=method_name,
property_method_type=PropertyMethodType.FSET,
),
snowpark_pandas_telemetry_method_decorator(
(
base_method.__delete__
if base_method.fdel is None
else base_method.fdel
),
property_name=method_name,
property_method_type=PropertyMethodType.FDEL,
),
doc=base_method.__doc__,
)
base_method = try_add_telemetry_to_attribute(method_name, base_method)
parent_method = getattr(BasePandasDataset, method_name, None)
if isinstance(parent_method, property):
parent_method = parent_method.fget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,36 +254,3 @@ def cache_result(self, inplace: bool = False) -> Optional[pd.DataFrame]:
self._update_inplace(new_qc)
else:
return pd.DataFrame(query_compiler=new_qc)


@register_dataframe_accessor("__array_function__")
@snowpark_pandas_telemetry_method_decorator
def __array_function__(self, func: callable, types: tuple, args: tuple, kwargs: dict):
"""
Apply the `func` to the `BasePandasDataset`.
Parameters
----------
func : np.func
The NumPy func to apply.
types : tuple
The types of the args.
args : tuple
The args to the func.
kwargs : dict
Additional keyword arguments.
Returns
-------
BasePandasDataset
The result of the ufunc applied to the `BasePandasDataset`.
"""
from snowflake.snowpark.modin.plugin.utils.numpy_to_pandas import (
numpy_to_pandas_func_map,
)

if func.__name__ in numpy_to_pandas_func_map:
return numpy_to_pandas_func_map[func.__name__](*args, **kwargs)
else:
# per NEP18 we raise NotImplementedError so that numpy can intercept
return NotImplemented # pragma: no cover
Loading

0 comments on commit 74594d9

Please sign in to comment.