Skip to content

Commit

Permalink
SNOW-1616989: Create lazy Index from another lazy Index without pulli…
Browse files Browse the repository at this point in the history
…ng data to client (#2195)
  • Loading branch information
sfc-gh-nkumar authored Sep 4, 2024
1 parent b68d75f commit 9fbf7b1
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@

- Refactored `quoted_identifier_to_snowflake_type` to avoid making metadata queries if the types have been cached locally.
- Improved `pd.to_datetime` to handle all local input cases.
- Create a lazy index from another lazy index without pulling data to client.

#### Bug Fixes

Expand Down
4 changes: 2 additions & 2 deletions src/snowflake/snowpark/modin/pandas/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -1760,10 +1760,10 @@ def to_datetime(
issued from a timezone with daylight savings, such as Europe/Paris):
>>> pd.to_datetime(['2020-10-25 02:00:00 +0200', '2020-10-25 04:00:00 +0100'])
Index(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns]')
DatetimeIndex(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
>>> pd.to_datetime(['2020-10-25 02:00:00 +0200', '2020-10-25 04:00:00 +0100'], format="%Y-%m-%d %H:%M:%S %z")
Index(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns]')
DatetimeIndex(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
Setting ``utc=True`` makes sure always convert to timezone-aware outputs:
Expand Down
104 changes: 82 additions & 22 deletions src/snowflake/snowpark/modin/plugin/extensions/datetime_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,8 @@ class DatetimeIndex(Index):
# Equivalent index type in native pandas
_NATIVE_INDEX_TYPE = native_pd.DatetimeIndex

def __new__(cls, *args, **kwargs):
"""
Create new instance of DatetimeIndex. This overrides behavior of Index.__new__.
Args:
*args: arguments.
**kwargs: keyword arguments.
Returns:
New instance of DatetimeIndex.
"""
return object.__new__(cls)

def __init__(
self,
def __new__(
cls,
data: ArrayLike | native_pd.Index | modin.pandas.Sereis | None = None,
freq: Frequency | lib.NoDefault = _CONSTRUCTOR_DEFAULTS["freq"],
tz=_CONSTRUCTOR_DEFAULTS["tz"],
Expand All @@ -96,9 +84,9 @@ def __init__(
copy: bool = _CONSTRUCTOR_DEFAULTS["copy"],
name: Hashable | None = _CONSTRUCTOR_DEFAULTS["name"],
query_compiler: SnowflakeQueryCompiler = None,
) -> None:
) -> DatetimeIndex:
"""
Immutable ndarray-like of datetime64 data.
Create new instance of DatetimeIndex. This overrides behavior of Index.__new__.
Parameters
----------
Expand Down Expand Up @@ -142,11 +130,8 @@ def __init__(
query_compiler : SnowflakeQueryCompiler, optional
A query compiler object to create the ``Index`` from.
Examples
--------
>>> idx = pd.DatetimeIndex(["1/1/2020 10:00:00+00:00", "2/1/2020 11:00:00+00:00"], tz="America/Los_Angeles")
>>> idx
DatetimeIndex(['2020-01-01 02:00:00-08:00', '2020-02-01 03:00:00-08:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
Returns:
New instance of DatetimeIndex.
"""
if query_compiler:
# Raise error if underlying type is not a TimestampType.
Expand All @@ -167,7 +152,82 @@ def __init__(
"copy": copy,
"name": name,
}
self._init_index(data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs)
index = object.__new__(cls)
index._query_compiler = DatetimeIndex._init_query_compiler(
data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs
)
# `_parent` keeps track of any Series or DataFrame that this Index is a part of.
index._parent = None
return index

def __init__(
self,
data: ArrayLike | native_pd.Index | modin.pandas.Sereis | None = None,
freq: Frequency | lib.NoDefault = _CONSTRUCTOR_DEFAULTS["freq"],
tz=_CONSTRUCTOR_DEFAULTS["tz"],
normalize: bool | lib.NoDefault = _CONSTRUCTOR_DEFAULTS["normalize"],
closed=_CONSTRUCTOR_DEFAULTS["closed"],
ambiguous: TimeAmbiguous = _CONSTRUCTOR_DEFAULTS["ambiguous"],
dayfirst: bool = _CONSTRUCTOR_DEFAULTS["dayfirst"],
yearfirst: bool = _CONSTRUCTOR_DEFAULTS["yearfirst"],
dtype: Dtype | None = _CONSTRUCTOR_DEFAULTS["dtype"],
copy: bool = _CONSTRUCTOR_DEFAULTS["copy"],
name: Hashable | None = _CONSTRUCTOR_DEFAULTS["name"],
query_compiler: SnowflakeQueryCompiler = None,
) -> None:
"""
Immutable ndarray-like of datetime64 data.
Parameters
----------
data : array-like (1-dimensional), pandas.Index, modin.pandas.Series, optional
Datetime-like data to construct index with.
freq : str or pandas offset object, optional
One of pandas date offset strings or corresponding objects. The string
'infer' can be passed in order to set the frequency of the index as the
inferred frequency upon creation.
tz : pytz.timezone or dateutil.tz.tzfile or datetime.tzinfo or str
Set the Timezone of the data.
normalize : bool, default False
Normalize start/end dates to midnight before generating date range.
closed : {'left', 'right'}, optional
Set whether to include `start` and `end` that are on the
boundary. The default includes boundary points on either end.
ambiguous : 'infer', bool-ndarray, 'NaT', default 'raise'
When clocks moved backward due to DST, ambiguous times may arise.
For example in Central European Time (UTC+01), when going from 03:00
DST to 02:00 non-DST, 02:30:00 local time occurs both at 00:30:00 UTC
and at 01:30:00 UTC. In such a situation, the `ambiguous` parameter
dictates how ambiguous times should be handled.
- 'infer' will attempt to infer fall dst-transition hours based on
order
- bool-ndarray where True signifies a DST time, False signifies a
non-DST time (note that this flag is only applicable for ambiguous
times)
- 'NaT' will return NaT where there are ambiguous times
- 'raise' will raise an AmbiguousTimeError if there are ambiguous times.
dayfirst : bool, default False
If True, parse dates in `data` with the day first order.
yearfirst : bool, default False
If True parse dates in `data` with the year first order.
dtype : numpy.dtype or DatetimeTZDtype or str, default None
Note that the only NumPy dtype allowed is `datetime64[ns]`.
copy : bool, default False
Make a copy of input ndarray.
name : label, default None
Name to be stored in the index.
query_compiler : SnowflakeQueryCompiler, optional
A query compiler object to create the ``Index`` from.
Examples
--------
>>> idx = pd.DatetimeIndex(["1/1/2020 10:00:00+00:00", "2/1/2020 11:00:00+00:00"], tz="America/Los_Angeles")
>>> idx
DatetimeIndex(['2020-01-01 02:00:00-08:00', '2020-02-01 03:00:00-08:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
"""
# DatetimeIndex is already initialized in __new__ method. We keep this method
# only for docstring generation.

def _dt_property(self, property_name: str) -> Index:
"""
Expand Down
76 changes: 35 additions & 41 deletions src/snowflake/snowpark/modin/plugin/extensions/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,26 @@ def __new__(
TimedeltaIndex,
)

if query_compiler:
dtype = query_compiler.index_dtypes[0]
if is_datetime64_any_dtype(dtype):
return DatetimeIndex(query_compiler=query_compiler)
if is_timedelta64_dtype(dtype):
return TimedeltaIndex(query_compiler=query_compiler)
elif isinstance(data, BasePandasDataset):
if data.ndim != 1:
raise ValueError("Index data must be 1 - dimensional")
dtype = data.dtype
if is_datetime64_any_dtype(dtype):
return DatetimeIndex(data, dtype=dtype, copy=copy, name=name)
if is_timedelta64_dtype(dtype):
return TimedeltaIndex(data, dtype=dtype, copy=copy, name=name)
else:
index = native_pd.Index(data, dtype, copy, name, tupleize_cols)
if isinstance(index, native_pd.DatetimeIndex):
return DatetimeIndex(data)
if isinstance(index, native_pd.TimedeltaIndex):
return TimedeltaIndex(data)
return object.__new__(cls)
kwargs = {
"dtype": dtype,
"copy": copy,
"name": name,
"tupleize_cols": tupleize_cols,
}
query_compiler = cls._init_query_compiler(
data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs
)
dtype = query_compiler.index_dtypes[0]
if is_datetime64_any_dtype(dtype):
return DatetimeIndex(query_compiler=query_compiler)
if is_timedelta64_dtype(dtype):
return TimedeltaIndex(query_compiler=query_compiler)
index = object.__new__(cls)
# Initialize the Index
index._query_compiler = query_compiler
# `_parent` keeps track of any Series or DataFrame that this Index is a part of.
index._parent = None
return index

def __init__(
self,
Expand Down Expand Up @@ -185,30 +184,23 @@ def __init__(
>>> pd.Index([1, 2, 3], dtype="uint8")
Index([1, 2, 3], dtype='int64')
"""
kwargs = {
"dtype": dtype,
"copy": copy,
"name": name,
"tupleize_cols": tupleize_cols,
}
self._init_index(data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs)
# Index is already initialized in __new__ method. We keep this method only for
# docstring generation.

def _init_index(
self,
@classmethod
def _init_query_compiler(
cls,
data: ArrayLike | native_pd.Index | Series | None,
ctor_defaults: dict,
query_compiler: SnowflakeQueryCompiler = None,
**kwargs: Any,
):
# `_parent` keeps track of any Series or DataFrame that this Index is a part of.
self._parent = None
) -> SnowflakeQueryCompiler:
if query_compiler:
# Raise warning if `data` is query compiler with non-default arguments.
for arg_name, arg_value in kwargs.items():
assert (
arg_value == ctor_defaults[arg_name]
), f"Non-default argument '{arg_name}={arg_value}' when constructing Index with query compiler"
self._query_compiler = query_compiler
elif isinstance(data, BasePandasDataset):
if data.ndim != 1:
raise ValueError("Index data must be 1 - dimensional")
Expand All @@ -218,15 +210,17 @@ def _init_index(
)
if series_has_no_name:
idx.name = None
self._query_compiler = idx._query_compiler
query_compiler = idx._query_compiler
elif isinstance(data, Index):
query_compiler = data._query_compiler
else:
self._query_compiler = DataFrame(
index=self._NATIVE_INDEX_TYPE(data=data, **kwargs)
query_compiler = DataFrame(
index=cls._NATIVE_INDEX_TYPE(data=data, **kwargs)
)._query_compiler
if len(self._query_compiler.columns):
self._query_compiler = self._query_compiler.drop(
columns=self._query_compiler.columns
)

if len(query_compiler.columns):
query_compiler = query_compiler.drop(columns=query_compiler.columns)
return query_compiler

def __getattr__(self, key: str) -> Any:
"""
Expand Down
75 changes: 55 additions & 20 deletions src/snowflake/snowpark/modin/plugin/extensions/timedelta_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,65 @@ class TimedeltaIndex(Index):
# Equivalent index type in native pandas
_NATIVE_INDEX_TYPE = native_pd.TimedeltaIndex

def __new__(cls, *args, **kwargs):
def __new__(
cls,
data: ArrayLike | native_pd.Index | Series | None = None,
unit: str | lib.NoDefault = _CONSTRUCTOR_DEFAULTS["unit"],
freq: Frequency | lib.NoDefault = _CONSTRUCTOR_DEFAULTS["freq"],
dtype: Dtype | None = _CONSTRUCTOR_DEFAULTS["dtype"],
copy: bool = _CONSTRUCTOR_DEFAULTS["copy"],
name: Hashable | None = _CONSTRUCTOR_DEFAULTS["name"],
query_compiler: SnowflakeQueryCompiler = None,
) -> TimedeltaIndex:
"""
Create new instance of TimedeltaIndex. This overrides behavior of Index.__new__.
Args:
*args: arguments.
**kwargs: keyword arguments.
Parameters
----------
data : array-like (1-dimensional), optional
Optional timedelta-like data to construct index with.
unit : {'D', 'h', 'm', 's', 'ms', 'us', 'ns'}, optional
The unit of ``data``.
.. deprecated:: 2.2.0
Use ``pd.to_timedelta`` instead.
freq : str or pandas offset object, optional
One of pandas date offset strings or corresponding objects. The string
``'infer'`` can be passed in order to set the frequency of the index as
the inferred frequency upon creation.
dtype : numpy.dtype or str, default None
Valid ``numpy`` dtypes are ``timedelta64[ns]``, ``timedelta64[us]``,
``timedelta64[ms]``, and ``timedelta64[s]``.
copy : bool
Make a copy of input array.
name : object
Name to be stored in the index.
Returns:
New instance of TimedeltaIndex.
"""
return object.__new__(cls)
if query_compiler:
# Raise error if underlying type is not a Timedelta type.
current_dtype = query_compiler.index_dtypes[0]
if not is_timedelta64_dtype(current_dtype):
raise ValueError(
f"TimedeltaIndex can only be created from a query compiler with TimedeltaType, found {current_dtype}"
)
kwargs = {
"unit": unit,
"freq": freq,
"dtype": dtype,
"copy": copy,
"name": name,
}
tdi = object.__new__(cls)
tdi._query_compiler = TimedeltaIndex._init_query_compiler(
data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs
)
# `_parent` keeps track of any Series or DataFrame that this Index is a part of.
tdi._parent = None
return tdi

def __init__(
self,
Expand Down Expand Up @@ -114,21 +162,8 @@ def __init__(
>>> pd.TimedeltaIndex(np.arange(5) * 24 * 3600 * 1e9, freq='infer')
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]', freq=None)
"""
if query_compiler:
# Raise error if underlying type is not a Timedelta type.
current_dtype = query_compiler.index_dtypes[0]
if not is_timedelta64_dtype(current_dtype):
raise ValueError(
f"TimedeltaIndex can only be created from a query compiler with TimedeltaType, found {current_dtype}"
)
kwargs = {
"unit": unit,
"freq": freq,
"dtype": dtype,
"copy": copy,
"name": name,
}
self._init_index(data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs)
# TimedeltaIndex is already initialized in __new__ method. We keep this method
# only for docstring generation.

@property
def days(self) -> Index:
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/index/test_datetime_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)


@sql_count_checker(query_count=3)
@sql_count_checker(query_count=0)
def test_datetime_index_construction():
# create from native pandas datetime index.
index = native_pd.DatetimeIndex(["2021-01-01", "2021-01-02", "2021-01-03"])
Expand Down
7 changes: 7 additions & 0 deletions tests/integ/modin/index/test_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def test_index_copy(native_index):
assert_index_equal(snow_index, new_index)


@sql_count_checker(query_count=2)
def test_index_creation_from_lazy_index():
i1 = pd.Index([1, 2, 3])
i2 = pd.Index(i1)
assert_index_equal(i1, i2)


@pytest.mark.parametrize("native_df", TEST_DFS)
@sql_count_checker(query_count=2)
def test_df_index_copy(native_df):
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/index/test_timedelta_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from tests.integ.modin.utils import assert_index_equal


@sql_count_checker(query_count=3)
@sql_count_checker(query_count=0)
def test_timedelta_index_construction():
# create from native pandas timedelta index.
index = native_pd.TimedeltaIndex(["1 days", "2 days", "3 days"])
Expand Down

0 comments on commit 9fbf7b1

Please sign in to comment.