From 2a6229878cca893a822caa20d67bd80d53a3182a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 1 Mar 2023 14:57:21 -0500 Subject: [PATCH 01/19] wip --- .../snowflake_pandas_type_handler.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 0baf6a8a72a43..379458e09d42b 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -36,6 +36,11 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s +def _add_missing_timezone(s: pd.Series) -> pd.Series: + if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): + pass + + class SnowflakePandasTypeHandler(DbTypeHandler[pd.DataFrame]): """Plugin for the Snowflake I/O Manager that can store and load Pandas DataFrames as Snowflake tables. From 1374e79ad8787ffd2b3656f775f0c672c2f191f4 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 1 Mar 2023 15:19:31 -0500 Subject: [PATCH 02/19] conversion fn --- .../snowflake_pandas_type_handler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 379458e09d42b..0eb7ccf996d33 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -38,7 +38,8 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: def _add_missing_timezone(s: pd.Series) -> pd.Series: if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): - pass + return s.dt.tz_localize("UTC") + return s class SnowflakePandasTypeHandler(DbTypeHandler[pd.DataFrame]): @@ -78,7 +79,7 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") - with_uppercase_cols = with_uppercase_cols.apply(_convert_timestamp_to_string, axis="index") + with_uppercase_cols = with_uppercase_cols.apply(_add_missing_timezone, axis="index") with_uppercase_cols.to_sql( table_slice.table, con=connection.engine, @@ -107,7 +108,7 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - result = result.apply(_convert_string_to_timestamp, axis="index") + # result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result From b695ee42bf0d1ddb6f5120b4e91f32c579ac6d44 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 22 Mar 2023 14:01:28 -0400 Subject: [PATCH 03/19] wip --- .../snowflake_pandas_type_handler.py | 37 +++++++++++++++---- .../dagster_snowflake/snowflake_io_manager.py | 14 +++++++ 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 0eb7ccf996d33..602cb85e0f0a8 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -2,25 +2,39 @@ import pandas as pd import pandas.core.dtypes.common as pd_core_dtypes_common -from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema -from dagster._core.definitions.metadata import RawMetadataValue -from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice from dagster_snowflake import build_snowflake_io_manager from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager from snowflake.connector.pandas_tools import pd_writer +from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema +from dagster._core.definitions.metadata import RawMetadataValue +from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice -def _convert_timestamp_to_string(s: pd.Series) -> pd.Series: + +def _table_exists(table_slice: TableSlice, connection): + tables = connection.execute( + f"SHOW TABLES LIKE '{table_slice.table}' IN DATABASE '{table_slice.database}' SCHEMA" + f" '{table_slice.schema}'" + ).fetchall() + return len(tables) > 0 + +def _get_table_schema(table_slice: TableSlice, connection): + if _table_exists(table_slice, connection): + return connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() + +def _convert_timestamp_to_string(s: pd.Series, table_schema) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) + if table_schema: + pass return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") else: return s -def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: +def _convert_string_to_timestamp(s: pd.Series, table_schema) -> pd.Series: """Converts columns of strings in Timestamp format to pd.Timestamp to undo the conversion in _convert_timestamp_to_string. @@ -79,7 +93,15 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") - with_uppercase_cols = with_uppercase_cols.apply(_add_missing_timezone, axis="index") + table_schema = _get_table_schema(table_slice, connection) + if context.config["time_data_to_string"]: + with_uppercase_cols = with_uppercase_cols.apply( + _convert_timestamp_to_string, axis="index", args=(table_schema) + ) + else: + with_uppercase_cols = with_uppercase_cols.apply( + _add_missing_timezone, axis="index", args=(table_schema) + ) with_uppercase_cols.to_sql( table_slice.table, con=connection.engine, @@ -108,7 +130,8 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - # result = result.apply(_convert_string_to_timestamp, axis="index") + if context.config["time_data_to_string"]: + result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index 689e39d17de4f..ee075e5ff1449 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -16,6 +16,7 @@ ) from pydantic import Field from sqlalchemy.exc import ProgrammingError +from dagster._utils.backcompat import deprecation_warning from .resources import SnowflakeConnection @@ -93,6 +94,10 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: @io_manager(config_schema=SnowflakeIOManager.to_config_schema()) def snowflake_io_manager(init_context): + if init_context.config["time_data_to_string"]: + deprecation_warning( + "Snowflake I/O manager config time_data_to_string", "2.0.0", "Convert existing tables to use timestamps and remove time_data_to_string configuration instead." + ) return DbIOManager( type_handlers=type_handlers, db_client=SnowflakeDbClient(), @@ -194,6 +199,15 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: " https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details." ), ) + time_data_to_string: bool = Field( + default=False, + description=( + "If using Pandas DataFrames, whether to convert time data to strings. If True," + " time data will be converted to strings when storing the DataFrame and" + " converted back to time data when loading the DataFrame. If False, time data" + " without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False." + ), + ) @staticmethod @abstractmethod From ea3d5b0754aa40f8fffe484e7dd3b752450f71b2 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 22 Mar 2023 15:55:16 -0400 Subject: [PATCH 04/19] add config and docs --- .../integrations/snowflake/reference.mdx | 34 ++++++++++++ .../snowflake_pandas_type_handler.py | 54 ++++++++++++------- .../test_snowflake_pandas_type_handler.py | 19 +++++++ .../dagster_snowflake/snowflake_io_manager.py | 10 +++- 4 files changed, 97 insertions(+), 20 deletions(-) diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index 9711d83c05e63..76140840012cb 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -11,6 +11,7 @@ This reference page provides information for working with [`dagster-snowflake`]( - [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset) - [Storing partitioned assets](#storing-partitioned-assets) - [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas) +- [Storing timestamp data in Pandas DataFrames](#storing-timestamp-data-in-pandas-dataframes) - [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers) - [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake) - [Using Pandas and PySpark DataFrames with Snowflake](#using-pandas-and-pyspark-dataframes-with-snowflake) @@ -330,6 +331,39 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a --- +## Storing timestamp data in Pandas DataFrames +Due to a longstanding bug in the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. Prior to `dagster-snowflake` version `0.19.0` we solved this issue by converting all timestamp data to strings before loading the data in Snowflake, and doing the opposite conversion when fetching a DataFrame from Snowflake. However, we can also avoid this issue by ensuring that all timestamp data has a timezone. This allows us to store the data as TIMESTAMP_NTZ(9) type in Snowflake. + +To specify how you would like timestamp data to be handled, use the `time_data_to_string` configuration value for the Snowflake I/O manager. If `True`, the I/O manager will convert timestamp data to a string before loading it into Snowflake. If `False` the I/O manager will ensure the data has a timezone (attaching the UTC timezone if necessary) before loading it into Snowflake. + +If you would like to migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`: + +```sql + +// Add a column of type TIMESTAMP_NTZ(9) +ALTER TABLE database.schema.table +ADD COLUMN time_copy TIMESTAMP_NTZ(9) + +// copy the data from time and convert to timestamp data +UPDATE database.schema.table +SET time_copy = to_timestamp_ntz(time) + +// drop the time column +ALTER TABLE database.schema.table +DROP COLUMN time + +// rename the time_copy column to time +ALTER TABLER database.schema.table +RENAME COLUMN time_copy TO time + +``` + + +The time_data_to_string configuration value will be deprecated in version X.Y.Z of the dagster-snowflake library. + + +--- + ## Using the Snowflake I/O manager with other I/O managers You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the `asset` decorator: diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 602cb85e0f0a8..e59d3e10b5f9f 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -2,39 +2,48 @@ import pandas as pd import pandas.core.dtypes.common as pd_core_dtypes_common -from dagster_snowflake import build_snowflake_io_manager -from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager -from snowflake.connector.pandas_tools import pd_writer - from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema from dagster._core.definitions.metadata import RawMetadataValue +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice +from dagster_snowflake import build_snowflake_io_manager +from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager +from snowflake.connector.pandas_tools import pd_writer def _table_exists(table_slice: TableSlice, connection): tables = connection.execute( - f"SHOW TABLES LIKE '{table_slice.table}' IN DATABASE '{table_slice.database}' SCHEMA" - f" '{table_slice.schema}'" + f"SHOW TABLES LIKE '{table_slice.table}' IN SCHEMA" + f" {table_slice.database}.{table_slice.schema}" ).fetchall() return len(tables) > 0 -def _get_table_schema(table_slice: TableSlice, connection): + +def _get_table_column_types(table_slice: TableSlice, connection): if _table_exists(table_slice, connection): - return connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() + schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() + return {item[0]: item[1] for item in schema_list} + -def _convert_timestamp_to_string(s: pd.Series, table_schema) -> pd.Series: +def _convert_timestamp_to_string(s: pd.Series, column_types) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) - if table_schema: - pass + if column_types: + if "VARCHAR" not in column_types[s.name]: + raise DagsterInvariantViolationError( + "Snowflake I/O manager configured to convert time data to strings, but the" + " corresponding column is not of type VARCHAR, it is of type" + f" {column_types[s.name]}. Please set time_data_to_string=False in the" + " Snowflake I/O manager configuration to store time data as TIMESTAMP types." + ) return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") else: return s -def _convert_string_to_timestamp(s: pd.Series, table_schema) -> pd.Series: +def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: """Converts columns of strings in Timestamp format to pd.Timestamp to undo the conversion in _convert_timestamp_to_string. @@ -50,8 +59,16 @@ def _convert_string_to_timestamp(s: pd.Series, table_schema) -> pd.Series: return s -def _add_missing_timezone(s: pd.Series) -> pd.Series: +def _add_missing_timezone(s: pd.Series, column_types) -> pd.Series: if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): + if column_types: + if "VARCHAR" in column_types[s.name]: + raise DagsterInvariantViolationError( + f"Snowflake I/O manager: The Snowflake column for {s.name} is of type" + f" {column_types[s.name]} and should be of type TIMESTAMP to store time data." + " Please migrate this column to be of time TIMESTAMP_NTZ(9) to store time" + " data." + ) return s.dt.tz_localize("UTC") return s @@ -93,14 +110,15 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") - table_schema = _get_table_schema(table_slice, connection) - if context.config["time_data_to_string"]: + column_types = _get_table_column_types(table_slice, connection) + if context.resource_config["time_data_to_string"]: with_uppercase_cols = with_uppercase_cols.apply( - _convert_timestamp_to_string, axis="index", args=(table_schema) + lambda x: _convert_timestamp_to_string(x, column_types), + axis="index", ) else: with_uppercase_cols = with_uppercase_cols.apply( - _add_missing_timezone, axis="index", args=(table_schema) + lambda x: _add_missing_timezone(x, column_types), axis="index" ) with_uppercase_cols.to_sql( table_slice.table, @@ -130,7 +148,7 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - if context.config["time_data_to_string"]: + if context.resource_config["time_data_to_string"]: result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 70484505185d1..0774a17d821a6 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -31,6 +31,7 @@ materialize, op, ) +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.storage.db_io_manager import TableSlice from dagster_snowflake import build_snowflake_io_manager from dagster_snowflake.resources import SnowflakeConnection @@ -241,6 +242,24 @@ def io_manager_timestamp_test_job(): res = io_manager_timestamp_test_job.execute_in_process() assert res.success + @job( + resource_defs={"snowflake": io_manager}, + config={ + "resources": { + "snowflake": { + "config": { + "time_data_as_string": True, + } + } + } + }, + ) + def io_manager_timestamp_as_string_test_job(): + read_time_df(emit_time_df()) + + with pytest.raises(DagsterInvariantViolationError): + io_manager_timestamp_as_string_test_job.execute_in_process() + @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.parametrize( diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index ee075e5ff1449..e55921117d209 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -17,6 +17,7 @@ from pydantic import Field from sqlalchemy.exc import ProgrammingError from dagster._utils.backcompat import deprecation_warning +from sqlalchemy.exc import ProgrammingError from .resources import SnowflakeConnection @@ -94,9 +95,14 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: @io_manager(config_schema=SnowflakeIOManager.to_config_schema()) def snowflake_io_manager(init_context): - if init_context.config["time_data_to_string"]: + if init_context.resource_config["time_data_to_string"]: deprecation_warning( - "Snowflake I/O manager config time_data_to_string", "2.0.0", "Convert existing tables to use timestamps and remove time_data_to_string configuration instead." + "Snowflake I/O manager config time_data_to_string", + "2.0.0", + ( + "Convert existing tables to use timestamps and remove time_data_to_string" + " configuration instead." + ), ) return DbIOManager( type_handlers=type_handlers, From 765acf1d891a2224abcc803832acb3e01d65285e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 22 Mar 2023 16:06:37 -0400 Subject: [PATCH 05/19] tests --- .../test_snowflake_pandas_type_handler.py | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 0774a17d821a6..3bb6fc88c0fc8 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -41,6 +41,7 @@ snowflake_pandas_io_manager, ) from dagster_snowflake_pandas.snowflake_pandas_type_handler import ( + _add_missing_timezone, _convert_string_to_timestamp, _convert_timestamp_to_string, ) @@ -94,7 +95,9 @@ def test_handle_output(): handler = SnowflakePandasTypeHandler() connection = MagicMock() df = DataFrame([{"col1": "a", "col2": 1}]) - output_context = build_output_context(resource_config=resource_config) + output_context = build_output_context( + resource_config={**resource_config, "time_data_to_string": False} + ) metadata = handler.handle_output( output_context, @@ -125,7 +128,9 @@ def test_load_input(): mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}]) handler = SnowflakePandasTypeHandler() - input_context = build_input_context() + input_context = build_input_context( + resource_config={**resource_config, "time_data_to_string": False} + ) df = handler.load_input( input_context, TableSlice( @@ -144,8 +149,7 @@ def test_load_input(): def test_type_conversions(): # no timestamp data no_time = pandas.Series([1, 2, 3, 4, 5]) - converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time)) - + converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None)) assert (converted == no_time).all() # timestamp data @@ -156,7 +160,7 @@ def test_type_conversions(): pandas.Timestamp("2017-03-01T12:30:45.35"), ] ) - time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time)) + time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time, None)) assert (with_time == time_converted).all() @@ -166,6 +170,25 @@ def test_type_conversions(): assert (_convert_string_to_timestamp(string_data) == string_data).all() +def test_timezone_conversions(): + # no timestamp data + no_time = pandas.Series([1, 2, 3, 4, 5]) + converted = _add_missing_timezone(no_time, None) + assert (converted == no_time).all() + + # timestamp data + with_time = pandas.Series( + [ + pandas.Timestamp("2017-01-01T12:30:45.35"), + pandas.Timestamp("2017-02-01T12:30:45.35"), + pandas.Timestamp("2017-03-01T12:30:45.35"), + ] + ) + time_converted = _add_missing_timezone(with_time, None) + + assert (with_time.dt.tz_localize("UTC") == time_converted).all() + + def test_build_snowflake_pandas_io_manager(): assert isinstance( build_snowflake_io_manager([SnowflakePandasTypeHandler()]), IOManagerDefinition From 0e7e5c7c5879db0d78db7e8c7fdf6f03c887d640 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 27 Mar 2023 11:12:35 -0400 Subject: [PATCH 06/19] docs update --- MIGRATION.md | 35 +++++++++++++++++++ .../integrations/snowflake/reference.mdx | 32 ++++------------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/MIGRATION.md b/MIGRATION.md index 7f98c5f12ff31..8d559e168c3ac 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -2,6 +2,41 @@ When new releases include breaking changes or deprecations, this document describes how to migrate. +## Migrating to 1.3.0 + + +### Breaking Changes + +#### Extension Libraries +[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `time_data_to_string=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations. + +To migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`: + +```sql + +// Add a column of type TIMESTAMP_NTZ(9) +ALTER TABLE database.schema.table +ADD COLUMN time_copy TIMESTAMP_NTZ(9) + +// copy the data from time and convert to timestamp data +UPDATE database.schema.table +SET time_copy = to_timestamp_ntz(time) + +// drop the time column +ALTER TABLE database.schema.table +DROP COLUMN time + +// rename the time_copy column to time +ALTER TABLER database.schema.table +RENAME COLUMN time_copy TO time + +``` + + +The time_data_to_string configuration value will be deprecated in version X.Y.Z of the dagster-snowflake library. At that point, all timestamp data will be stored as TIMESTAMP_NTZ(9) type. + + + ## Migrating to 1.2.0 ### Database migration diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index 76140840012cb..449e71412a9bd 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -332,34 +332,14 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a --- ## Storing timestamp data in Pandas DataFrames -Due to a longstanding bug in the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. Prior to `dagster-snowflake` version `0.19.0` we solved this issue by converting all timestamp data to strings before loading the data in Snowflake, and doing the opposite conversion when fetching a DataFrame from Snowflake. However, we can also avoid this issue by ensuring that all timestamp data has a timezone. This allows us to store the data as TIMESTAMP_NTZ(9) type in Snowflake. - -To specify how you would like timestamp data to be handled, use the `time_data_to_string` configuration value for the Snowflake I/O manager. If `True`, the I/O manager will convert timestamp data to a string before loading it into Snowflake. If `False` the I/O manager will ensure the data has a timezone (attaching the UTC timezone if necessary) before loading it into Snowflake. - -If you would like to migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`: - -```sql - -// Add a column of type TIMESTAMP_NTZ(9) -ALTER TABLE database.schema.table -ADD COLUMN time_copy TIMESTAMP_NTZ(9) - -// copy the data from time and convert to timestamp data -UPDATE database.schema.table -SET time_copy = to_timestamp_ntz(time) - -// drop the time column -ALTER TABLE database.schema.table -DROP COLUMN time - -// rename the time_copy column to time -ALTER TABLER database.schema.table -RENAME COLUMN time_copy TO time - -``` +Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector. -The time_data_to_string configuration value will be deprecated in version X.Y.Z of the dagster-snowflake library. +Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. If you have used a version of `dagster-snowflake` prior to version `0.19.0` please see the{" "} + + Migration Guide + {" "} +for information about migrating you database tables. --- From f6dc2670f7fae715d4168dd51f78e1afe8fa82d5 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 27 Mar 2023 11:14:55 -0400 Subject: [PATCH 07/19] rename config value --- MIGRATION.md | 2 +- .../snowflake_pandas_type_handler.py | 6 +++--- .../dagster_snowflake/snowflake_io_manager.py | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/MIGRATION.md b/MIGRATION.md index 8d559e168c3ac..ce97e86af3bfa 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -8,7 +8,7 @@ When new releases include breaking changes or deprecations, this document descri ### Breaking Changes #### Extension Libraries -[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `time_data_to_string=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations. +[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `store_timestamps_as_strings=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations. To migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`: diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index e59d3e10b5f9f..c6b21c76c2fa3 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -35,7 +35,7 @@ def _convert_timestamp_to_string(s: pd.Series, column_types) -> pd.Series: raise DagsterInvariantViolationError( "Snowflake I/O manager configured to convert time data to strings, but the" " corresponding column is not of type VARCHAR, it is of type" - f" {column_types[s.name]}. Please set time_data_to_string=False in the" + f" {column_types[s.name]}. Please set store_timestamps_as_strings=False in the" " Snowflake I/O manager configuration to store time data as TIMESTAMP types." ) return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") @@ -111,7 +111,7 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") column_types = _get_table_column_types(table_slice, connection) - if context.resource_config["time_data_to_string"]: + if context.resource_config and context.resource_config["store_timestamps_as_strings"]: with_uppercase_cols = with_uppercase_cols.apply( lambda x: _convert_timestamp_to_string(x, column_types), axis="index", @@ -148,7 +148,7 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - if context.resource_config["time_data_to_string"]: + if context.resource_config and context.resource_config["store_timestamps_as_strings"]: result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index e55921117d209..f37f7c2af3abb 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -95,13 +95,13 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: @io_manager(config_schema=SnowflakeIOManager.to_config_schema()) def snowflake_io_manager(init_context): - if init_context.resource_config["time_data_to_string"]: + if init_context.resource_config["store_timestamps_as_strings"]: deprecation_warning( - "Snowflake I/O manager config time_data_to_string", + "Snowflake I/O manager config store_timestamps_as_strings", "2.0.0", ( - "Convert existing tables to use timestamps and remove time_data_to_string" - " configuration instead." + "Convert existing tables to use timestamps and remove" + " store_timestamps_as_strings configuration instead." ), ) return DbIOManager( @@ -205,7 +205,7 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: " https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details." ), ) - time_data_to_string: bool = Field( + store_timestamps_as_strings: bool = Field( default=False, description=( "If using Pandas DataFrames, whether to convert time data to strings. If True," From 1be141c0d24d4227f6187d019e92b974945bb1f4 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 27 Mar 2023 11:25:28 -0400 Subject: [PATCH 08/19] format --- .../snowflake_pandas_type_handler.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index c6b21c76c2fa3..5391186d7cc96 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -25,18 +25,20 @@ def _get_table_column_types(table_slice: TableSlice, connection): return {item[0]: item[1] for item in schema_list} -def _convert_timestamp_to_string(s: pd.Series, column_types) -> pd.Series: +def _convert_timestamp_to_string(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ + column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) if column_types: - if "VARCHAR" not in column_types[s.name]: + if "VARCHAR" not in column_types[column_name]: raise DagsterInvariantViolationError( "Snowflake I/O manager configured to convert time data to strings, but the" " corresponding column is not of type VARCHAR, it is of type" - f" {column_types[s.name]}. Please set store_timestamps_as_strings=False in the" - " Snowflake I/O manager configuration to store time data as TIMESTAMP types." + f" {column_types[column_name]}. Please set store_timestamps_as_strings=False in" + " the Snowflake I/O manager configuration to store time data as TIMESTAMP" + " types." ) return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") else: @@ -59,15 +61,16 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s -def _add_missing_timezone(s: pd.Series, column_types) -> pd.Series: +def _add_missing_timezone(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: + column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): if column_types: - if "VARCHAR" in column_types[s.name]: + if "VARCHAR" in column_types[column_name]: raise DagsterInvariantViolationError( f"Snowflake I/O manager: The Snowflake column for {s.name} is of type" - f" {column_types[s.name]} and should be of type TIMESTAMP to store time data." - " Please migrate this column to be of time TIMESTAMP_NTZ(9) to store time" - " data." + f" {column_types[column_name]} and should be of type TIMESTAMP to store time" + " data. Please migrate this column to be of time TIMESTAMP_NTZ(9) to store" + " time data." ) return s.dt.tz_localize("UTC") return s From bdad6f583cc6cdb21bf33eb57bd6650035c9effc Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 10 Apr 2023 11:37:54 -0400 Subject: [PATCH 09/19] rebase --- .../test_snowflake_pandas_type_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 3bb6fc88c0fc8..f3da3b06819b3 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -271,7 +271,7 @@ def io_manager_timestamp_test_job(): "resources": { "snowflake": { "config": { - "time_data_as_string": True, + "store_timestamps_as_strings": True, } } } From d016fff8880ef45810eeda7ce5ad073700c794c8 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 10 Apr 2023 14:40:02 -0400 Subject: [PATCH 10/19] typing --- .../content/integrations/snowflake/reference.mdx | 12 +++++++----- .../snowflake_pandas_type_handler.py | 16 +++++++++++----- .../dagster_snowflake/snowflake_io_manager.py | 13 ++++++------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index 449e71412a9bd..cd50104fe23da 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -332,14 +332,16 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a --- ## Storing timestamp data in Pandas DataFrames + Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector. -Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. If you have used a version of `dagster-snowflake` prior to version `0.19.0` please see the{" "} - - Migration Guide - {" "} -for information about migrating you database tables. + Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager + converted all timestamp data to strings before loading the data in Snowflake, + and did the opposite conversion when fetching a DataFrame from Snowflake. If + you have used a version of `dagster-snowflake` prior to version `0.19.0` + please see the Migration Guide{" "} + for information about migrating you database tables. --- diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 5391186d7cc96..d0d3f7a5b9a8a 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -19,13 +19,15 @@ def _table_exists(table_slice: TableSlice, connection): return len(tables) > 0 -def _get_table_column_types(table_slice: TableSlice, connection): +def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]: if _table_exists(table_slice, connection): schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall() return {item[0]: item[1] for item in schema_list} -def _convert_timestamp_to_string(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: +def _convert_timestamp_to_string( + s: pd.Series, column_types: Optional[Mapping[str, str]] +) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. """ @@ -61,7 +63,7 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s -def _add_missing_timezone(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series: +def _add_missing_timezone(s: pd.Series, column_types: Optional[Mapping[str, str]]) -> pd.Series: column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): if column_types: @@ -114,7 +116,9 @@ def handle_output( connector.paramstyle = "pyformat" with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns") column_types = _get_table_column_types(table_slice, connection) - if context.resource_config and context.resource_config["store_timestamps_as_strings"]: + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): with_uppercase_cols = with_uppercase_cols.apply( lambda x: _convert_timestamp_to_string(x, column_types), axis="index", @@ -151,7 +155,9 @@ def load_input( result = pd.read_sql( sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection ) - if context.resource_config and context.resource_config["store_timestamps_as_strings"]: + if context.resource_config and context.resource_config.get( + "store_timestamps_as_strings", False + ): result = result.apply(_convert_string_to_timestamp, axis="index") result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs) return result diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index f37f7c2af3abb..6988df48cf676 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -14,9 +14,8 @@ TablePartitionDimension, TableSlice, ) -from pydantic import Field -from sqlalchemy.exc import ProgrammingError from dagster._utils.backcompat import deprecation_warning +from pydantic import Field from sqlalchemy.exc import ProgrammingError from .resources import SnowflakeConnection @@ -205,13 +204,13 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: " https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details." ), ) - store_timestamps_as_strings: bool = Field( + store_timestamps_as_strings: bool = Field( default=False, description=( - "If using Pandas DataFrames, whether to convert time data to strings. If True," - " time data will be converted to strings when storing the DataFrame and" - " converted back to time data when loading the DataFrame. If False, time data" - " without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False." + "If using Pandas DataFrames, whether to convert time data to strings. If True, time" + " data will be converted to strings when storing the DataFrame and converted back to" + " time data when loading the DataFrame. If False, time data without a timezone will be" + " set to UTC timezone to avoid a Snowflake bug. Defaults to False." ), ) From 61d5f464d5b53f847513cc35230b73535af0ecf6 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 10 Apr 2023 15:44:21 -0400 Subject: [PATCH 11/19] fix attr access --- .../dagster_snowflake/snowflake_io_manager.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index 6988df48cf676..0de3c666ddf36 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -94,7 +94,7 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: @io_manager(config_schema=SnowflakeIOManager.to_config_schema()) def snowflake_io_manager(init_context): - if init_context.resource_config["store_timestamps_as_strings"]: + if init_context.resource_config.get("store_timestamps_as_strings", False): deprecation_warning( "Snowflake I/O manager config store_timestamps_as_strings", "2.0.0", @@ -262,6 +262,15 @@ def default_load_type() -> Optional[Type]: return None def create_io_manager(self, context) -> DbIOManager: + if self.store_timestamps_as_strings: + deprecation_warning( + "Snowflake I/O manager config store_timestamps_as_strings", + "2.0.0", + ( + "Convert existing tables to use timestamps and remove" + " store_timestamps_as_strings configuration instead." + ), + ) return DbIOManager( db_client=SnowflakeDbClient(), io_manager_name="SnowflakeIOManager", From 1251aea73e78bbb1e12106fc69ab24f41f1bd2d4 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 11 Apr 2023 14:08:16 -0400 Subject: [PATCH 12/19] remove deprecation warnings --- MIGRATION.md | 5 ----- .../test_snowflake_pandas_type_handler.py | 11 ++++++++++- .../dagster_snowflake/snowflake_io_manager.py | 19 ------------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/MIGRATION.md b/MIGRATION.md index ce97e86af3bfa..098400a3c5e26 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -32,11 +32,6 @@ RENAME COLUMN time_copy TO time ``` - -The time_data_to_string configuration value will be deprecated in version X.Y.Z of the dagster-snowflake library. At that point, all timestamp data will be stored as TIMESTAMP_NTZ(9) type. - - - ## Migrating to 1.2.0 ### Database migration diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index f3da3b06819b3..f3b36f23b408d 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -230,7 +230,7 @@ def io_manager_test_pipeline(): @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.parametrize( - "io_manager", [(old_snowflake_io_manager), (pythonic_snowflake_io_manager)] + "io_manager", [(snowflake_pandas_io_manager), (SnowflakePandasIOManager.configure_at_launch())] ) def test_io_manager_with_snowflake_pandas_timestamp_data(io_manager): with temporary_snowflake_table( @@ -258,6 +258,13 @@ def read_time_df(df: pandas.DataFrame): @job( resource_defs={"snowflake": io_manager}, + config={ + "resources": { + "snowflake": { + "config": {**SHARED_BUILDKITE_SNOWFLAKE_CONF, "database": DATABASE} + } + } + }, ) def io_manager_timestamp_test_job(): read_time_df(emit_time_df()) @@ -271,6 +278,8 @@ def io_manager_timestamp_test_job(): "resources": { "snowflake": { "config": { + **SHARED_BUILDKITE_SNOWFLAKE_CONF, + "database": DATABASE, "store_timestamps_as_strings": True, } } diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py index 0de3c666ddf36..21d1688ce586e 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/snowflake_io_manager.py @@ -14,7 +14,6 @@ TablePartitionDimension, TableSlice, ) -from dagster._utils.backcompat import deprecation_warning from pydantic import Field from sqlalchemy.exc import ProgrammingError @@ -94,15 +93,6 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame: @io_manager(config_schema=SnowflakeIOManager.to_config_schema()) def snowflake_io_manager(init_context): - if init_context.resource_config.get("store_timestamps_as_strings", False): - deprecation_warning( - "Snowflake I/O manager config store_timestamps_as_strings", - "2.0.0", - ( - "Convert existing tables to use timestamps and remove" - " store_timestamps_as_strings configuration instead." - ), - ) return DbIOManager( type_handlers=type_handlers, db_client=SnowflakeDbClient(), @@ -262,15 +252,6 @@ def default_load_type() -> Optional[Type]: return None def create_io_manager(self, context) -> DbIOManager: - if self.store_timestamps_as_strings: - deprecation_warning( - "Snowflake I/O manager config store_timestamps_as_strings", - "2.0.0", - ( - "Convert existing tables to use timestamps and remove" - " store_timestamps_as_strings configuration instead." - ), - ) return DbIOManager( db_client=SnowflakeDbClient(), io_manager_name="SnowflakeIOManager", From f0627cc84cb3c4ab119002861cef742e95bca5fc Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 11 Apr 2023 14:16:00 -0400 Subject: [PATCH 13/19] update error messaging --- .../snowflake_pandas_type_handler.py | 29 ++++++++++--------- .../test_snowflake_pandas_type_handler.py | 11 ++++++- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index d0d3f7a5b9a8a..6915edbfe5ffa 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -26,7 +26,7 @@ def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Map def _convert_timestamp_to_string( - s: pd.Series, column_types: Optional[Mapping[str, str]] + s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str ) -> pd.Series: """Converts columns of data of type pd.Timestamp to string so that it can be stored in snowflake. @@ -36,11 +36,12 @@ def _convert_timestamp_to_string( if column_types: if "VARCHAR" not in column_types[column_name]: raise DagsterInvariantViolationError( - "Snowflake I/O manager configured to convert time data to strings, but the" - " corresponding column is not of type VARCHAR, it is of type" - f" {column_types[column_name]}. Please set store_timestamps_as_strings=False in" - " the Snowflake I/O manager configuration to store time data as TIMESTAMP" - " types." + "Snowflake I/O manager: Snowflake I/O manager configured to convert time data" + f" in DataFrame column {column_name} to strings, but the corresponding" + f" {column_name.upper()} column in table {table_name} is not of type VARCHAR," + f" it is of type {column_types[column_name]}. Please set" + " store_timestamps_as_strings=False in the Snowflake I/O manager configuration" + " to store time data as TIMESTAMP types." ) return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z") else: @@ -63,16 +64,18 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series: return s -def _add_missing_timezone(s: pd.Series, column_types: Optional[Mapping[str, str]]) -> pd.Series: +def _add_missing_timezone( + s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str +) -> pd.Series: column_name = str(s.name) if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): if column_types: if "VARCHAR" in column_types[column_name]: raise DagsterInvariantViolationError( - f"Snowflake I/O manager: The Snowflake column for {s.name} is of type" - f" {column_types[column_name]} and should be of type TIMESTAMP to store time" - " data. Please migrate this column to be of time TIMESTAMP_NTZ(9) to store" - " time data." + f"Snowflake I/O manager: The Snowflake column {column_name.upper()} in table" + f" {table_name} is of type {column_types[column_name]} and should be of type" + f" TIMESTAMP to store the time data in dataframe column {column_name}. Please" + " migrate this column to be of time TIMESTAMP_NTZ(9) to store time data." ) return s.dt.tz_localize("UTC") return s @@ -120,12 +123,12 @@ def handle_output( "store_timestamps_as_strings", False ): with_uppercase_cols = with_uppercase_cols.apply( - lambda x: _convert_timestamp_to_string(x, column_types), + lambda x: _convert_timestamp_to_string(x, column_types, table_slice.table), axis="index", ) else: with_uppercase_cols = with_uppercase_cols.apply( - lambda x: _add_missing_timezone(x, column_types), axis="index" + lambda x: _add_missing_timezone(x, column_types, table_slice.table), axis="index" ) with_uppercase_cols.to_sql( table_slice.table, diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index f3b36f23b408d..4a1c383a5dda2 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -289,7 +289,16 @@ def io_manager_timestamp_test_job(): def io_manager_timestamp_as_string_test_job(): read_time_df(emit_time_df()) - with pytest.raises(DagsterInvariantViolationError): + with pytest.raises( + DagsterInvariantViolationError, + match=( + "Snowflake I/O manager: Snowflake I/O manager configured to convert time data in" + " DataFrame column date to strings, but the corresponding DATE column in table" + f" {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set" + " store_timestamps_as_strings=False in the Snowflake I/O manager configuration to" + " store time data as TIMESTAMP types." + ), + ): io_manager_timestamp_as_string_test_job.execute_in_process() From d7cccd51b8ada72e5b4e49655446ed86829bc08f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 11 Apr 2023 15:32:56 -0400 Subject: [PATCH 14/19] fix unit tests --- .../test_snowflake_pandas_type_handler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 4a1c383a5dda2..0c0a5c42de960 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -149,7 +149,7 @@ def test_load_input(): def test_type_conversions(): # no timestamp data no_time = pandas.Series([1, 2, 3, 4, 5]) - converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None)) + converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None, "foo")) assert (converted == no_time).all() # timestamp data @@ -160,7 +160,9 @@ def test_type_conversions(): pandas.Timestamp("2017-03-01T12:30:45.35"), ] ) - time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time, None)) + time_converted = _convert_string_to_timestamp( + _convert_timestamp_to_string(with_time, None, "foo") + ) assert (with_time == time_converted).all() @@ -173,7 +175,7 @@ def test_type_conversions(): def test_timezone_conversions(): # no timestamp data no_time = pandas.Series([1, 2, 3, 4, 5]) - converted = _add_missing_timezone(no_time, None) + converted = _add_missing_timezone(no_time, None, "foo") assert (converted == no_time).all() # timestamp data @@ -184,7 +186,7 @@ def test_timezone_conversions(): pandas.Timestamp("2017-03-01T12:30:45.35"), ] ) - time_converted = _add_missing_timezone(with_time, None) + time_converted = _add_missing_timezone(with_time, None, "foo") assert (with_time.dt.tz_localize("UTC") == time_converted).all() @@ -293,7 +295,7 @@ def io_manager_timestamp_as_string_test_job(): DagsterInvariantViolationError, match=( "Snowflake I/O manager: Snowflake I/O manager configured to convert time data in" - " DataFrame column date to strings, but the corresponding DATE column in table" + " DataFrame column DATE to strings, but the corresponding DATE column in table" f" {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set" " store_timestamps_as_strings=False in the Snowflake I/O manager configuration to" " store time data as TIMESTAMP types." From 06f7a804f7207f363ba25f30e4f1a1c056637577 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 11 Apr 2023 15:56:00 -0400 Subject: [PATCH 15/19] try as triple quote string --- .../test_snowflake_pandas_type_handler.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 0c0a5c42de960..7331f31da0b1b 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -293,13 +293,12 @@ def io_manager_timestamp_as_string_test_job(): with pytest.raises( DagsterInvariantViolationError, - match=( - "Snowflake I/O manager: Snowflake I/O manager configured to convert time data in" - " DataFrame column DATE to strings, but the corresponding DATE column in table" - f" {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set" - " store_timestamps_as_strings=False in the Snowflake I/O manager configuration to" - " store time data as TIMESTAMP types." - ), + match=f"""Snowflake I/O manager: Snowflake I/O manager configured to convert time data in + DataFrame column DATE to strings, but the corresponding DATE column in table + {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set + store_timestamps_as_strings=False in the Snowflake I/O manager configuration to + store time data as TIMESTAMP types. + """, ): io_manager_timestamp_as_string_test_job.execute_in_process() From 05c6e412bd42a2b9c8544a36dfb7bde2c8bb9a2e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 12 Apr 2023 10:55:36 -0400 Subject: [PATCH 16/19] format test string --- .../test_snowflake_pandas_type_handler.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 7331f31da0b1b..0c0a5c42de960 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -293,12 +293,13 @@ def io_manager_timestamp_as_string_test_job(): with pytest.raises( DagsterInvariantViolationError, - match=f"""Snowflake I/O manager: Snowflake I/O manager configured to convert time data in - DataFrame column DATE to strings, but the corresponding DATE column in table - {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set - store_timestamps_as_strings=False in the Snowflake I/O manager configuration to - store time data as TIMESTAMP types. - """, + match=( + "Snowflake I/O manager: Snowflake I/O manager configured to convert time data in" + " DataFrame column DATE to strings, but the corresponding DATE column in table" + f" {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set" + " store_timestamps_as_strings=False in the Snowflake I/O manager configuration to" + " store time data as TIMESTAMP types." + ), ): io_manager_timestamp_as_string_test_job.execute_in_process() From 06ac2483de1658ba758278cb66c7957bb718a05e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 12 Apr 2023 11:24:02 -0400 Subject: [PATCH 17/19] match smaller --- .../test_snowflake_pandas_type_handler.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 0c0a5c42de960..91c775d7c5797 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -293,13 +293,7 @@ def io_manager_timestamp_as_string_test_job(): with pytest.raises( DagsterInvariantViolationError, - match=( - "Snowflake I/O manager: Snowflake I/O manager configured to convert time data in" - " DataFrame column DATE to strings, but the corresponding DATE column in table" - f" {table_name} is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9). Please set" - " store_timestamps_as_strings=False in the Snowflake I/O manager configuration to" - " store time data as TIMESTAMP types." - ), + match="is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9)", ): io_manager_timestamp_as_string_test_job.execute_in_process() From 4fbd758598dbad9c9094636d5b29ce7d30d5880e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 12 Apr 2023 12:02:12 -0400 Subject: [PATCH 18/19] regex? --- .../test_snowflake_pandas_type_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index 91c775d7c5797..3e0437ef13cae 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -293,7 +293,7 @@ def io_manager_timestamp_as_string_test_job(): with pytest.raises( DagsterInvariantViolationError, - match="is not of type VARCHAR, it is of type TIMESTAMP_NTZ(9)", + match=r"is not of type VARCHAR, it is of type TIMESTAMP_NTZ\(9\)", ): io_manager_timestamp_as_string_test_job.execute_in_process() From 02a000b1f4dce7c7b92da35283af28cc203e7d96 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 12 Apr 2023 12:25:08 -0400 Subject: [PATCH 19/19] fix pyright --- .../dagster_snowflake_pandas/snowflake_pandas_type_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py index 6915edbfe5ffa..62561a0a8af3d 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas/snowflake_pandas_type_handler.py @@ -68,7 +68,7 @@ def _add_missing_timezone( s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str ) -> pd.Series: column_name = str(s.name) - if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): + if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs) if column_types: if "VARCHAR" in column_types[column_name]: raise DagsterInvariantViolationError(