diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index 449e71412a9bd..cd50104fe23da 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -332,14 +332,16 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a --- ## Storing timestamp data in Pandas DataFrames + Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector. -Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. If you have used a version of `dagster-snowflake` prior to version `0.19.0` please see the{" "} - - Migration Guide - {" "} -for information about migrating you database tables. + Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager + converted all timestamp data to strings before loading the data in Snowflake, + and did the opposite conversion when fetching a DataFrame from Snowflake. If + you have used a version of `dagster-snowflake` prior to version `0.19.0` + please see the Migration Guide{" "} + for information about migrating you database tables. --- diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 5391186d7cc96..d0d3f7a5b9a8a 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -19,13 +19,15 @@ def _table_exists(table_slice: TableSlice, connection): return len(tables) > 0 -def _get_table_column_types(table_slice: TableSlice, connection): +def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]: if _table_exists(table_slice, connection): schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() return {item[0]: item[1] for item in schema_list} -def _convert_timestamp_to_string(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: +def _convert_timestamp_to_string( + s: pd.Series, column_types: Optional[Mapping[str, str]] +) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ @@ -61,7 +63,7 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s -def _add_missing_timezone(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: +def _add_missing_timezone(s: pd.Series, column_types: Optional[Mapping[str, str]]) -> pd.Series: column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): if column_types: @@ -114,7 +116,9 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") column_types = _get_table_column_types(table_slice, connection) - if context.resource_config and context.resource_config["store_timestamps_as_strings"]: + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): with_uppercase_cols = with_uppercase_cols.apply( lambda x: _convert_timestamp_to_string(x, column_types), axis="index", @@ -151,7 +155,9 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - if context.resource_config and context.resource_config["store_timestamps_as_strings"]: + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index 236be8ae82adb..eb48edce94adb 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -15,9 +15,8 @@ TablePartitionDimension, TableSlice, ) -from pydantic import Field -from sqlalchemy.exc import ProgrammingError from dagster._utils.backcompat import deprecation_warning +from pydantic import Field from sqlalchemy.exc import ProgrammingError from .resources import SnowflakeConnection @@ -206,13 +205,13 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: " https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details." ), ) - store_timestamps_as_strings: bool = Field( + store_timestamps_as_strings: bool = Field( default=False, description=( - "If using Pandas DataFrames, whether to convert time data to strings. If True," - " time data will be converted to strings when storing the DataFrame and" - " converted back to time data when loading the DataFrame. If False, time data" - " without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False." + "If using Pandas DataFrames, whether to convert time data to strings. If True, time" + " data will be converted to strings when storing the DataFrame and converted back to" + " time data when loading the DataFrame. If False, time data without a timezone will be" + " set to UTC timezone to avoid a Snowflake bug. Defaults to False." ), )