Skip to content

Commit

Permalink
Merge branch 'main' into zhan-SNOW-1665697-update-write-api
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-zhan authored Sep 17, 2024
2 parents 31f8f6f + 6a3a77b commit 46e586e
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 44 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@

- Added support for `TimedeltaIndex.mean` method.
- Added support for some cases of aggregating `Timedelta` columns on `axis=0` with `agg` or `aggregate`.
- Added support for `by`, `left_by`, and `right_by` for `pd.merge_asof`.
- Added support for `by`, `left_by`, `right_by`, `left_index`, and `right_index` for `pd.merge_asof`.

#### Bug Fixes

- Fixed a bug where an `Index` object created from a `Series`/`DataFrame` incorrectly updates the `Series`/`DataFrame`'s index name after an inplace update has been applied to the original `Series`/`DataFrame`.
- Suppressed an unhelpful `SettingWithCopyWarning` that sometimes appeared when printing `Timedelta` columns.
- Fixed `inplace` argument for `Series` objects derived from other `Series` objects.

## 1.22.1 (2024-09-11)
This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for detailed release content.
Expand Down
3 changes: 1 addition & 2 deletions docs/source/modin/supported/general_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ Data manipulations
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge`` | P | ``validate`` | ``N`` if param ``validate`` is given |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge_asof`` | P | ``left_index``, ``right_index``, | ``N`` if param ``direction`` is ``nearest``. |
| | | , ``suffixes``, ``tolerance`` | |
| ``merge_asof`` | P | ``suffixes``, ``tolerance`` | ``N`` if param ``direction`` is ``nearest`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge_ordered`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
12 changes: 9 additions & 3 deletions src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,9 +1519,15 @@ def convert_str_to_timedelta(x: str) -> pd.Timedelta:
downcast_pandas_df.columns, cached_snowpark_pandas_types
):
if snowpark_pandas_type is not None and snowpark_pandas_type == timedelta_t:
downcast_pandas_df[pandas_label] = pandas_df[pandas_label].apply(
convert_str_to_timedelta
)
# By default, pandas warns, "A value is trying to be set on a
# copy of a slice from a DataFrame" here because we are
# assigning a column to downcast_pandas_df, which is a copy of
# a slice of pandas_df. We don't care what happens to pandas_df,
# so the warning isn't useful to us.
with native_pd.option_context("mode.chained_assignment", None):
downcast_pandas_df[pandas_label] = pandas_df[pandas_label].apply(
convert_str_to_timedelta
)

# Step 7. postprocessing for return types
if index_only:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7381,10 +7381,10 @@ def merge_asof(
SnowflakeQueryCompiler
"""
# TODO: SNOW-1634547: Implement remaining parameters by leveraging `merge` implementation
if left_index or right_index or tolerance or suffixes != ("_x", "_y"):
if tolerance or suffixes != ("_x", "_y"):
ErrorMessage.not_implemented(
"Snowpark pandas merge_asof method does not currently support parameters "
+ "'left_index', 'right_index', 'suffixes', or 'tolerance'"
+ "'suffixes', or 'tolerance'"
)
if direction not in ("backward", "forward"):
ErrorMessage.not_implemented(
Expand Down
61 changes: 48 additions & 13 deletions src/snowflake/snowpark/modin/plugin/extensions/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@
}


class IndexParent:
def __init__(self, parent: DataFrame | Series) -> None:
"""
Initialize the IndexParent object.
IndexParent is used to keep track of the parent object that the Index is a part of.
It tracks the parent object and the parent object's query compiler at the time of creation.
Parameters
----------
parent : DataFrame or Series
The parent object that the Index is a part of.
"""
assert isinstance(parent, (DataFrame, Series))
self._parent = parent
self._parent_qc = parent._query_compiler

def check_and_update_parent_qc_index_names(self, names: list) -> None:
"""
Update the Index and its parent's index names if the query compiler associated with the parent is
different from the original query compiler recorded, i.e., an inplace update has been applied to the parent.
"""
if self._parent._query_compiler is self._parent_qc:
new_query_compiler = self._parent_qc.set_index_names(names)
self._parent._update_inplace(new_query_compiler=new_query_compiler)
# Update the query compiler after naming operation.
self._parent_qc = new_query_compiler


class Index(metaclass=TelemetryMeta):

# Equivalent index type in native pandas
Expand Down Expand Up @@ -135,7 +164,7 @@ def __new__(
index = object.__new__(cls)
# Initialize the Index
index._query_compiler = query_compiler
# `_parent` keeps track of any Series or DataFrame that this Index is a part of.
# `_parent` keeps track of the parent object that this Index is a part of.
index._parent = None
return index

Expand Down Expand Up @@ -252,6 +281,17 @@ def __getattr__(self, key: str) -> Any:
ErrorMessage.not_implemented(f"Index.{key} is not yet implemented")
raise err

def _set_parent(self, parent: Series | DataFrame) -> None:
"""
Set the parent object and its query compiler.
Parameters
----------
parent : Series or DataFrame
The parent object that the Index is a part of.
"""
self._parent = IndexParent(parent)

def _binary_ops(self, method: str, other: Any) -> Index:
if isinstance(other, Index):
other = other.to_series().reset_index(drop=True)
Expand Down Expand Up @@ -408,12 +448,6 @@ def __constructor__(self):
"""
return type(self)

def _set_parent(self, parent: Series | DataFrame):
"""
Set the parent object of the current Index to a given Series or DataFrame.
"""
self._parent = parent

@property
def values(self) -> ArrayLike:
"""
Expand Down Expand Up @@ -726,10 +760,11 @@ def name(self, value: Hashable) -> None:
if not is_hashable(value):
raise TypeError(f"{type(self).__name__}.name must be a hashable type")
self._query_compiler = self._query_compiler.set_index_names([value])
# Update the name of the parent's index only if an inplace update is performed on
# the parent object, i.e., the parent's current query compiler matches the originally
# recorded query compiler.
if self._parent is not None:
self._parent._update_inplace(
new_query_compiler=self._parent._query_compiler.set_index_names([value])
)
self._parent.check_and_update_parent_qc_index_names([value])

def _get_names(self) -> list[Hashable]:
"""
Expand All @@ -755,10 +790,10 @@ def _set_names(self, values: list) -> None:
if isinstance(values, Index):
values = values.to_list()
self._query_compiler = self._query_compiler.set_index_names(values)
# Update the name of the parent's index only if the parent's current query compiler
# matches the recorded query compiler.
if self._parent is not None:
self._parent._update_inplace(
new_query_compiler=self._parent._query_compiler.set_index_names(values)
)
self._parent.check_and_update_parent_qc_index_names(values)

names = property(fset=_set_names, fget=_get_names)

Expand Down
19 changes: 19 additions & 0 deletions src/snowflake/snowpark/modin/plugin/extensions/series_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ def __init__(
self.name = name


@register_series_accessor("_update_inplace")
def _update_inplace(self, new_query_compiler) -> None:
"""
Update the current Series in-place using `new_query_compiler`.
Parameters
----------
new_query_compiler : BaseQueryCompiler
QueryCompiler to use to manage the data.
"""
super(Series, self)._update_inplace(new_query_compiler=new_query_compiler)
# Propagate changes back to parent so that column in dataframe had the same contents
if self._parent is not None:
if self._parent_axis == 1 and isinstance(self._parent, DataFrame):
self._parent[self.name] = self
else:
self._parent.loc[self.index] = self


# Since Snowpark pandas leaves all data on the warehouse, memory_usage's report of local memory
# usage isn't meaningful and is set to always return 0.
@_inherit_docstrings(native_pd.Series.memory_usage, apilink="pandas.Series")
Expand Down
14 changes: 5 additions & 9 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ def __init__(
)
self._custom_package_usage_config: Dict = {}
self._conf = self.RuntimeConfig(self, options or {})
self._tmpdir_handler: Optional[tempfile.TemporaryDirectory] = None
self._runtime_version_from_requirement: str = None
self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self)
_logger.info("Snowpark Session information: %s", self._session_info)
Expand Down Expand Up @@ -1710,8 +1709,8 @@ def _upload_unsupported_packages(

try:
# Setup a temporary directory and target folder where pip install will take place.
self._tmpdir_handler = tempfile.TemporaryDirectory()
tmpdir = self._tmpdir_handler.name
tmpdir_handler = tempfile.TemporaryDirectory()
tmpdir = tmpdir_handler.name
target = os.path.join(tmpdir, "unsupported_packages")
if not os.path.exists(target):
os.makedirs(target)
Expand Down Expand Up @@ -1796,9 +1795,7 @@ def _upload_unsupported_packages(
for requirement in supported_dependencies + new_dependencies
]
)
metadata_local_path = os.path.join(
self._tmpdir_handler.name, metadata_file
)
metadata_local_path = os.path.join(tmpdir_handler.name, metadata_file)
with open(metadata_local_path, "w") as file:
for key, value in metadata.items():
file.write(f"{key},{value}\n")
Expand Down Expand Up @@ -1834,9 +1831,8 @@ def _upload_unsupported_packages(
f"-third-party-packages-from-anaconda-in-a-udf."
)
finally:
if self._tmpdir_handler:
self._tmpdir_handler.cleanup()
self._tmpdir_handler = None
if tmpdir_handler:
tmpdir_handler.cleanup()

return supported_dependencies + new_dependencies

Expand Down
4 changes: 2 additions & 2 deletions tests/integ/modin/index/test_datetime_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ def test_index_parent():
# DataFrame case.
df = pd.DataFrame({"A": [1]}, index=native_idx1)
snow_idx = df.index
assert_frame_equal(snow_idx._parent, df)
assert_frame_equal(snow_idx._parent._parent, df)
assert_index_equal(snow_idx, native_idx1)

# Series case.
s = pd.Series([1, 2], index=native_idx2, name="zyx")
snow_idx = s.index
assert_series_equal(snow_idx._parent, s)
assert_series_equal(snow_idx._parent._parent, s)
assert_index_equal(snow_idx, native_idx2)


Expand Down
4 changes: 2 additions & 2 deletions tests/integ/modin/index/test_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ def test_index_parent():
# DataFrame case.
df = pd.DataFrame([[1, 2], [3, 4]], index=native_idx1)
snow_idx = df.index
assert_frame_equal(snow_idx._parent, df)
assert_frame_equal(snow_idx._parent._parent, df)
assert_index_equal(snow_idx, native_idx1)

# Series case.
s = pd.Series([1, 2, 4, 5, 6, 7], index=native_idx2, name="zyx")
snow_idx = s.index
assert_series_equal(snow_idx._parent, s)
assert_series_equal(snow_idx._parent._parent, s)
assert_index_equal(snow_idx, native_idx2)


Expand Down
66 changes: 66 additions & 0 deletions tests/integ/modin/index/test_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,69 @@ def test_index_names_with_lazy_index():
),
inplace=True,
)


@sql_count_checker(query_count=1)
def test_index_names_replace_behavior():
"""
Check that the index name of a DataFrame cannot be updated after the DataFrame has been modified.
"""
data = {
"A": [0, 1, 2, 3, 4, 4],
"B": ["a", "b", "c", "d", "e", "f"],
}
idx = [1, 2, 3, 4, 5, 6]
native_df = native_pd.DataFrame(data, native_pd.Index(idx, name="test"))
snow_df = pd.DataFrame(data, index=pd.Index(idx, name="test"))

# Get a reference to the index of the DataFrames.
snow_index = snow_df.index
native_index = native_df.index

# Change the names.
snow_index.name = "test2"
native_index.name = "test2"

# Compare the names.
assert snow_index.name == native_index.name == "test2"
assert snow_df.index.name == native_df.index.name == "test2"

# Change the query compiler the DataFrame is referring to, change the names.
snow_df.dropna(inplace=True)
native_df.dropna(inplace=True)
snow_index.name = "test3"
native_index.name = "test3"

# Compare the names. Changing the index name should not change the DataFrame's index name.
assert snow_index.name == native_index.name == "test3"
assert snow_df.index.name == native_df.index.name == "test2"


@sql_count_checker(query_count=1)
def test_index_names_multiple_renames():
"""
Check that the index name of a DataFrame can be renamed any number of times.
"""
data = {
"A": [0, 1, 2, 3, 4, 4],
"B": ["a", "b", "c", "d", "e", "f"],
}
idx = [1, 2, 3, 4, 5, 6]
native_df = native_pd.DataFrame(data, native_pd.Index(idx, name="test"))
snow_df = pd.DataFrame(data, index=pd.Index(idx, name="test"))

# Get a reference to the index of the DataFrames.
snow_index = snow_df.index
native_index = native_df.index

# Change and compare the names.
snow_index.name = "test2"
native_index.name = "test2"
assert snow_index.name == native_index.name == "test2"
assert snow_df.index.name == native_df.index.name == "test2"

# Change the names again and compare.
snow_index.name = "test3"
native_index.name = "test3"
assert snow_index.name == native_index.name == "test3"
assert snow_df.index.name == native_df.index.name == "test3"
16 changes: 16 additions & 0 deletions tests/integ/modin/series/test_fillna.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#


import string

import modin.pandas as pd
import numpy as np
import pandas as native_pd
Expand Down Expand Up @@ -201,3 +203,17 @@ def inplace_fillna(df):
native_pd.DataFrame([[1, 2, 3], [4, None, 6]], columns=list("ABC")),
inplace_fillna,
)


@pytest.mark.parametrize("index", [list(range(8)), list(string.ascii_lowercase[:8])])
@sql_count_checker(query_count=1, join_count=4)
def test_inplace_fillna_from_series(index):
def inplace_fillna(series):
series.iloc[:4].fillna(14, inplace=True)
return series

eval_snowpark_pandas_result(
pd.Series([np.nan, 1, 2, 3, 4, 5, 6, 7], index=index),
native_pd.Series([np.nan, 1, 2, 3, 4, 5, 6, 7], index=index),
inplace_fillna,
)
Loading

0 comments on commit 46e586e

Please sign in to comment.