Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1617349: Add aliases for format options to dataframe reader and writer #2155

19 changes: 19 additions & 0 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,25 @@ def get_copy_into_table_options(
return file_format_type_options, copy_options


def get_aliased_option_name(
key: str,
alias_map: Dict[str, str],
) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. added

"""Method that takes a key and an option alias map as arguments and returns
the aliased key if the key is present in the alias map. Also raise a warning
if alias key is applied.
"""
upper_key = key.strip().upper()
aliased_key = alias_map.get(upper_key, upper_key)
if aliased_key != upper_key:
logger.warning(
f"Option '{key}' is aliased to '{aliased_key}'. You may see unexpected behavior."
" Please refer to format specific options for more information"
)

return aliased_key


def strip_double_quotes_in_like_statement_in_table_name(table_name: str) -> str:
"""
this function is used by method _table_exists to handle double quotes in table name when calling
Expand Down
17 changes: 16 additions & 1 deletion src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from snowflake.snowpark._internal.utils import (
INFER_SCHEMA_FORMAT_TYPES,
TempObjectType,
get_aliased_option_name,
get_copy_into_table_options,
random_name_for_temp_object,
)
Expand All @@ -43,6 +44,19 @@
logger = getLogger(__name__)

LOCAL_TESTING_SUPPORTED_FILE_FORMAT = ("JSON",)
READER_OPTIONS_ALIAS_MAP = {
"DELIMITER": "FIELD_DELIMITER",
"HEADER": "PARSE_HEADER",
"PATHGLOBFILTER": "PATTERN",
"FILENAMEPATTERN": "PATTERN",
"INFERSCHEMA": "INFER_SCHEMA",
"SEP": "FIELD_DELIMITER",
"LINESEP": "RECORD_DELIMITER",
"QUOTE": "FIELD_OPTIONALLY_ENCLOSED_BY",
"NULLVALUE": "NULL_IF",
"DATEFORMAT": "DATE_FORMAT",
"TIMESTAMPFORMAT": "TIMESTAMP_FORMAT",
}


class DataFrameReader:
Expand Down Expand Up @@ -569,7 +583,8 @@ def option(self, key: str, value: Any) -> "DataFrameReader":
key: Name of the option (e.g. ``compression``, ``skip_header``, etc.).
value: Value of the option.
"""
self._cur_options[key.upper()] = value
aliased_key = get_aliased_option_name(key, READER_OPTIONS_ALIAS_MAP)
self._cur_options[aliased_key] = value
return self

def options(self, configs: Dict) -> "DataFrameReader":
Expand Down
21 changes: 20 additions & 1 deletion src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from snowflake.snowpark._internal.type_utils import ColumnOrName, ColumnOrSqlExpr
from snowflake.snowpark._internal.utils import (
SUPPORTED_TABLE_TYPES,
get_aliased_option_name,
normalize_remote_file_or_dir,
parse_table_name,
str_to_enum,
Expand All @@ -39,6 +40,15 @@
else:
from collections.abc import Iterable

WRITER_OPTIONS_ALIAS_MAP = {
"SEP": "FIELD_DELIMITER",
"LINESEP": "RECORD_DELIMITER",
"QUOTE": "FIELD_OPTIONALLY_ENCLOSED_BY",
"NULLVALUE": "NULL_IF",
"DATEFORMAT": "DATE_FORMAT",
"TIMESTAMPFORMAT": "TIMESTAMP_FORMAT",
}


class DataFrameWriter:
"""Provides methods for writing data from a :class:`DataFrame` to supported output destinations.
Expand Down Expand Up @@ -352,14 +362,23 @@ def copy_into_location(
raise TypeError( # pragma: no cover
f"'partition_by' is expected to be a column name, a Column object, or a sql expression. Got type {type(partition_by)}"
)

# apply writer option alias mapping
format_type_aliased_options = None
if format_type_options:
format_type_aliased_options = {}
for key, value in format_type_options.items():
aliased_key = get_aliased_option_name(key, WRITER_OPTIONS_ALIAS_MAP)
format_type_aliased_options[aliased_key] = value

df = self._dataframe._with_plan(
CopyIntoLocationNode(
self._dataframe._plan,
stage_location,
partition_by=partition_by,
file_format_name=file_format_name,
file_format_type=file_format_type,
format_type_options=format_type_options,
format_type_options=format_type_aliased_options,
copy_options=copy_options,
header=header,
)
Expand Down
14 changes: 14 additions & 0 deletions tests/integ/scala/test_dataframe_reader_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
METADATA_FILENAME,
METADATA_START_SCAN_TIME,
)
from snowflake.snowpark.dataframe_reader import READER_OPTIONS_ALIAS_MAP
from snowflake.snowpark.exceptions import (
SnowparkDataframeReaderException,
SnowparkPlanException,
Expand Down Expand Up @@ -440,6 +441,19 @@ def mock_run_query(*args, **kwargs):
assert "Could not infer csv schema due to exception:" in caplog.text


@pytest.mark.parametrize("mode", ["select", "copy"])
def test_reader_option_aliases(session, mode, caplog):
reader = get_reader(session, mode)
with caplog.at_level(logging.WARN):
for key, _aliased_key in READER_OPTIONS_ALIAS_MAP.items():
reader.option(key, "test")
assert (
f"Option '{key}' is aliased to '{_aliased_key}'. You may see unexpected behavior"
in caplog.text
)
caplog.clear()


@pytest.mark.parametrize("mode", ["select", "copy"])
def test_read_csv_incorrect_schema(session, mode):
reader = get_reader(session, mode)
Expand Down
29 changes: 28 additions & 1 deletion tests/integ/scala/test_dataframe_writer_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import copy
import logging

import pytest

Expand Down Expand Up @@ -535,7 +536,7 @@ def create_and_append_check_answer(table_name_input):
"config.getoption('local_testing_mode', default=False)",
reason="BUG: SNOW-1235716 should raise not implemented error not AttributeError: 'MockExecutionPlan' object has no attribute 'replace_repeated_subquery_with_cte'",
)
def test_writer_csv(session, tmpdir_factory):
def test_writer_csv(session, caplog):

"""Tests for df.write.csv()."""
df = session.create_dataframe([[1, 2], [3, 4], [5, 6], [3, 7]], schema=["a", "b"])
Expand Down Expand Up @@ -590,6 +591,32 @@ def test_writer_csv(session, tmpdir_factory):
assert result6[0].rows_unloaded == ROWS_COUNT
data6 = session.read.schema(schema).csv(f"@{path6}")
Utils.assert_rows_count(data6, ROWS_COUNT)

# test option alias case
path7 = f"{temp_stage}/test_csv_example7/my_file.csv.gz"
with caplog.at_level(logging.WARNING):
result7 = df.write.csv(
path7,
format_type_options={"SEP": ":", "quote": '"'},
single=True,
header=True,
)
assert "Option 'SEP' is aliased to 'FIELD_DELIMITER'." in caplog.text
assert (
"Option 'quote' is aliased to 'FIELD_OPTIONALLY_ENCLOSED_BY'."
in caplog.text
)

assert result7[0].rows_unloaded == ROWS_COUNT
data7 = (
session.read.schema(schema)
.option("header", True)
.option("inferSchema", True)
.option("SEP", ":")
.option("quote", '"')
.csv(f"@{path7}")
)
Utils.check_answer(data7, df)
finally:
Utils.drop_stage(session, temp_stage)

Expand Down
Loading