Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Apr 10, 2023
1 parent fd3ec99 commit 63d1cb4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
12 changes: 7 additions & 5 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,16 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a
---

## Storing timestamp data in Pandas DataFrames

Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector.

<Note>
Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. If you have used a version of `dagster-snowflake` prior to version `0.19.0` please see the{" "}
<a href="/migration#extension-libraries">
Migration Guide
</a>{" "}
for information about migrating you database tables.
Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager
converted all timestamp data to strings before loading the data in Snowflake,
and did the opposite conversion when fetching a DataFrame from Snowflake. If
you have used a version of `dagster-snowflake` prior to version `0.19.0`
please see the <a href="/migration#extension-libraries">Migration Guide</a>{" "}
for information about migrating you database tables.
</Note>

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ def _table_exists(table_slice: TableSlice, connection):
return len(tables) > 0


def _get_table_column_types(table_slice: TableSlice, connection):
def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]:
if _table_exists(table_slice, connection):
schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall()
return {item[0]: item[1] for item in schema_list}


def _convert_timestamp_to_string(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series:
def _convert_timestamp_to_string(
s: pd.Series, column_types: Optional[Mapping[str, str]]
) -> pd.Series:
"""Converts columns of data of type pd.Timestamp to string so that it can be stored in
snowflake.
"""
Expand Down Expand Up @@ -61,7 +63,7 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series:
return s


def _add_missing_timezone(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series:
def _add_missing_timezone(s: pd.Series, column_types: Optional[Mapping[str, str]]) -> pd.Series:
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s):
if column_types:
Expand Down Expand Up @@ -114,7 +116,9 @@ def handle_output(
connector.paramstyle = "pyformat"
with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns")
column_types = _get_table_column_types(table_slice, connection)
if context.resource_config and context.resource_config["store_timestamps_as_strings"]:
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
with_uppercase_cols = with_uppercase_cols.apply(
lambda x: _convert_timestamp_to_string(x, column_types),
axis="index",
Expand Down Expand Up @@ -151,7 +155,9 @@ def load_input(
result = pd.read_sql(
sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection
)
if context.resource_config and context.resource_config["store_timestamps_as_strings"]:
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
result = result.apply(_convert_string_to_timestamp, axis="index")
result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs)
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
TablePartitionDimension,
TableSlice,
)
from pydantic import Field
from sqlalchemy.exc import ProgrammingError
from dagster._utils.backcompat import deprecation_warning
from pydantic import Field
from sqlalchemy.exc import ProgrammingError

from .resources import SnowflakeConnection
Expand Down Expand Up @@ -206,13 +205,13 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
" https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details."
),
)
store_timestamps_as_strings: bool = Field(
store_timestamps_as_strings: bool = Field(
default=False,
description=(
"If using Pandas DataFrames, whether to convert time data to strings. If True,"
" time data will be converted to strings when storing the DataFrame and"
" converted back to time data when loading the DataFrame. If False, time data"
" without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False."
"If using Pandas DataFrames, whether to convert time data to strings. If True, time"
" data will be converted to strings when storing the DataFrame and converted back to"
" time data when loading the DataFrame. If False, time data without a timezone will be"
" set to UTC timezone to avoid a Snowflake bug. Defaults to False."
),
)

Expand Down

0 comments on commit 63d1cb4

Please sign in to comment.