From d0f48e1766abf59db0251a5caf03337b2b13907e Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Wed, 12 Apr 2023 13:31:31 -0400 Subject: [PATCH] [dagster-snowflake] config to determine how to handle timestamp data (#13097) --- MIGRATION.md | 30 ++++++++ .../integrations/snowflake/reference.mdx | 16 +++++ .../snowflake_pandas_type_handler.py | 65 +++++++++++++++++- .../test_snowflake_pandas_type_handler.py | 68 +++++++++++++++++-- .../dagster_snowflake/snowflake_io_manager.py | 9 +++ 5 files changed, 179 insertions(+), 9 deletions(-) diff --git a/MIGRATION.md b/MIGRATION.md index 7f98c5f12ff31..098400a3c5e26 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -2,6 +2,36 @@ When new releases include breaking changes or deprecations, this document describes how to migrate. +## Migrating to 1.3.0 + + +### Breaking Changes + +#### Extension Libraries +[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `store_timestamps_as_strings=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations. + +To migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`: + +```sql + +// Add a column of type TIMESTAMP_NTZ(9) +ALTER TABLE database.schema.table +ADD COLUMN time_copy TIMESTAMP_NTZ(9) + +// copy the data from time and convert to timestamp data +UPDATE database.schema.table +SET time_copy = to_timestamp_ntz(time) + +// drop the time column +ALTER TABLE database.schema.table +DROP COLUMN time + +// rename the time_copy column to time +ALTER TABLER database.schema.table +RENAME COLUMN time_copy TO time + +``` + ## Migrating to 1.2.0 ### Database migration diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index 9711d83c05e63..cd50104fe23da 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -11,6 +11,7 @@ This reference page provides information for working with [`dagster-snowflake`]( - [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset) - [Storing partitioned assets](#storing-partitioned-assets) - [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas) +- [Storing timestamp data in Pandas DataFrames](#storing-timestamp-data-in-pandas-dataframes) - [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers) - [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake) - [Using Pandas and PySpark DataFrames with Snowflake](#using-pandas-and-pyspark-dataframes-with-snowflake) @@ -330,6 +331,21 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a --- +## Storing timestamp data in Pandas DataFrames + +Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector. + + + Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager + converted all timestamp data to strings before loading the data in Snowflake, + and did the opposite conversion when fetching a DataFrame from Snowflake. If + you have used a version of `dagster-snowflake` prior to version `0.19.0` + please see the Migration Guide{" "} + for information about migrating you database tables. + + +--- + ## Using the Snowflake I/O manager with other I/O managers You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the `asset` decorator: diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 0baf6a8a72a43..62561a0a8af3d 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -4,17 +4,45 @@ import pandas.core.dtypes.common as pd_core_dtypes_common from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema from dagster._core.definitions.metadata import RawMetadataValue +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice from dagster_snowflake import build_snowflake_io_manager from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager from snowflake.connector.pandas_tools import pd_writer -def _convert_timestamp_to_string(s: pd.Series) -> pd.Series: +def _table_exists(table_slice: TableSlice, connection): + tables = connection.execute( + f"SHOW TABLES LIKE '{table_slice.table}' IN SCHEMA" + f" {table_slice.database}.{table_slice.schema}" + ).fetchall() + return len(tables) > 0 + + +def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]: + if _table_exists(table_slice, connection): + schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() + return {item[0]: item[1] for item in schema_list} + + +def _convert_timestamp_to_string( + s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str +) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ + column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) + if column_types: + if "VARCHAR" not in column_types[column_name]: + raise DagsterInvariantViolationError( + "Snowflake I/O manager: Snowflake I/O manager configured to convert time data" + f" in DataFrame column {column_name} to strings, but the corresponding" + f" {column_name.upper()} column in table {table_name} is not of type VARCHAR," + f" it is of type {column_types[column_name]}. Please set" + " store_timestamps_as_strings=False in the Snowflake I/O manager configuration" + " to store time data as TIMESTAMP types." + ) return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") else: return s @@ -36,6 +64,23 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s +def _add_missing_timezone( + s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str +) -> pd.Series: + column_name = str(s.name) + if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) + if column_types: + if "VARCHAR" in column_types[column_name]: + raise DagsterInvariantViolationError( + f"Snowflake I/O manager: The Snowflake column {column_name.upper()} in table" + f" {table_name} is of type {column_types[column_name]} and should be of type" + f" TIMESTAMP to store the time data in dataframe column {column_name}. Please" + " migrate this column to be of time TIMESTAMP_NTZ(9) to store time data." + ) + return s.dt.tz_localize("UTC") + return s + + class SnowflakePandasTypeHandler(DbTypeHandler[pd.DataFrame]): """Plugin for the Snowflake I/O Manager that can store and load Pandas DataFrames as Snowflake tables. @@ -73,7 +118,18 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") - with_uppercase_cols = with_uppercase_cols.apply(_convert_timestamp_to_string, axis="index") + column_types = _get_table_column_types(table_slice, connection) + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): + with_uppercase_cols = with_uppercase_cols.apply( + lambda x: _convert_timestamp_to_string(x, column_types, table_slice.table), + axis="index", + ) + else: + with_uppercase_cols = with_uppercase_cols.apply( + lambda x: _add_missing_timezone(x, column_types, table_slice.table), axis="index" + ) with_uppercase_cols.to_sql( table_slice.table, con=connection.engine, @@ -102,7 +158,10 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - result = result.apply(_convert_string_to_timestamp, axis="index") + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): + result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 70484505185d1..3e0437ef13cae 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -31,6 +31,7 @@ materialize, op, ) +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.storage.db_io_manager import TableSlice from dagster_snowflake import build_snowflake_io_manager from dagster_snowflake.resources import SnowflakeConnection @@ -40,6 +41,7 @@ snowflake_pandas_io_manager, ) from dagster_snowflake_pandas.snowflake_pandas_type_handler import ( + _add_missing_timezone, _convert_string_to_timestamp, _convert_timestamp_to_string, ) @@ -93,7 +95,9 @@ def test_handle_output(): handler = SnowflakePandasTypeHandler() connection = MagicMock() df = DataFrame([{"col1": "a", "col2": 1}]) - output_context = build_output_context(resource_config=resource_config) + output_context = build_output_context( + resource_config={**resource_config, "time_data_to_string": False} + ) metadata = handler.handle_output( output_context, @@ -124,7 +128,9 @@ def test_load_input(): mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}]) handler = SnowflakePandasTypeHandler() - input_context = build_input_context() + input_context = build_input_context( + resource_config={**resource_config, "time_data_to_string": False} + ) df = handler.load_input( input_context, TableSlice( @@ -143,8 +149,7 @@ def test_load_input(): def test_type_conversions(): # no timestamp data no_time = pandas.Series([1, 2, 3, 4, 5]) - converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time)) - + converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None, "foo")) assert (converted == no_time).all() # timestamp data @@ -155,7 +160,9 @@ def test_type_conversions(): pandas.Timestamp("2017-03-01T12:30:45.35"), ] ) - time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time)) + time_converted = _convert_string_to_timestamp( + _convert_timestamp_to_string(with_time, None, "foo") + ) assert (with_time == time_converted).all() @@ -165,6 +172,25 @@ def test_type_conversions(): assert (_convert_string_to_timestamp(string_data) == string_data).all() +def test_timezone_conversions(): + # no timestamp data + no_time = pandas.Series([1, 2, 3, 4, 5]) + converted = _add_missing_timezone(no_time, None, "foo") + assert (converted == no_time).all() + + # timestamp data + with_time = pandas.Series( + [ + pandas.Timestamp("2017-01-01T12:30:45.35"), + pandas.Timestamp("2017-02-01T12:30:45.35"), + pandas.Timestamp("2017-03-01T12:30:45.35"), + ] + ) + time_converted = _add_missing_timezone(with_time, None, "foo") + + assert (with_time.dt.tz_localize("UTC") == time_converted).all() + + def test_build_snowflake_pandas_io_manager(): assert isinstance( build_snowflake_io_manager([SnowflakePandasTypeHandler()]), IOManagerDefinition @@ -206,7 +232,7 @@ def io_manager_test_pipeline(): @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.parametrize( - "io_manager", [(old_snowflake_io_manager), (pythonic_snowflake_io_manager)] + "io_manager", [(snowflake_pandas_io_manager), (SnowflakePandasIOManager.configure_at_launch())] ) def test_io_manager_with_snowflake_pandas_timestamp_data(io_manager): with temporary_snowflake_table( @@ -234,6 +260,13 @@ def read_time_df(df: pandas.DataFrame): @job( resource_defs={"snowflake": io_manager}, + config={ + "resources": { + "snowflake": { + "config": {**SHARED_BUILDKITE_SNOWFLAKE_CONF, "database": DATABASE} + } + } + }, ) def io_manager_timestamp_test_job(): read_time_df(emit_time_df()) @@ -241,6 +274,29 @@ def io_manager_timestamp_test_job(): res = io_manager_timestamp_test_job.execute_in_process() assert res.success + @job( + resource_defs={"snowflake": io_manager}, + config={ + "resources": { + "snowflake": { + "config": { + **SHARED_BUILDKITE_SNOWFLAKE_CONF, + "database": DATABASE, + "store_timestamps_as_strings": True, + } + } + } + }, + ) + def io_manager_timestamp_as_string_test_job(): + read_time_df(emit_time_df()) + + with pytest.raises( + DagsterInvariantViolationError, + match=r"is not of type VARCHAR, it is of type TIMESTAMP_NTZ\(9\)", + ): + io_manager_timestamp_as_string_test_job.execute_in_process() + @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.parametrize( diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index 9e77dc18978d3..0551fb686222a 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -194,6 +194,15 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: " https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details." ), ) + store_timestamps_as_strings: bool = Field( + default=False, + description=( + "If using Pandas DataFrames, whether to convert time data to strings. If True, time" + " data will be converted to strings when storing the DataFrame and converted back to" + " time data when loading the DataFrame. If False, time data without a timezone will be" + " set to UTC timezone to avoid a Snowflake bug. Defaults to False." + ), + ) @staticmethod @abstractmethod