Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-snowflake] config to determine how to handle timestamp data #13097

Merged
merged 19 commits into from
Apr 12, 2023
30 changes: 30 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,36 @@

When new releases include breaking changes or deprecations, this document describes how to migrate.

## Migrating to 1.3.0


### Breaking Changes

#### Extension Libraries
[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `store_timestamps_as_strings=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations.

To migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`:

```sql

// Add a column of type TIMESTAMP_NTZ(9)
ALTER TABLE database.schema.table
ADD COLUMN time_copy TIMESTAMP_NTZ(9)

// copy the data from time and convert to timestamp data
UPDATE database.schema.table
SET time_copy = to_timestamp_ntz(time)

// drop the time column
ALTER TABLE database.schema.table
DROP COLUMN time

// rename the time_copy column to time
ALTER TABLER database.schema.table
RENAME COLUMN time_copy TO time

```

## Migrating to 1.2.0

### Database migration
Expand Down
16 changes: 16 additions & 0 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This reference page provides information for working with [`dagster-snowflake`](
- [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset)
- [Storing partitioned assets](#storing-partitioned-assets)
- [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas)
- [Storing timestamp data in Pandas DataFrames](#storing-timestamp-data-in-pandas-dataframes)
- [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers)
- [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake)
- [Using Pandas and PySpark DataFrames with Snowflake](#using-pandas-and-pyspark-dataframes-with-snowflake)
Expand Down Expand Up @@ -330,6 +331,21 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a

---

## Storing timestamp data in Pandas DataFrames
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved

Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector.

<Note>
Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager
converted all timestamp data to strings before loading the data in Snowflake,
and did the opposite conversion when fetching a DataFrame from Snowflake. If
you have used a version of `dagster-snowflake` prior to version `0.19.0`
please see the <a href="/migration#extension-libraries">Migration Guide</a>{" "}
for information about migrating you database tables.
</Note>

---

## Using the Snowflake I/O manager with other I/O managers

You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the `asset` decorator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,45 @@
import pandas.core.dtypes.common as pd_core_dtypes_common
from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema
from dagster._core.definitions.metadata import RawMetadataValue
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager
from snowflake.connector.pandas_tools import pd_writer


def _convert_timestamp_to_string(s: pd.Series) -> pd.Series:
def _table_exists(table_slice: TableSlice, connection):
tables = connection.execute(
f"SHOW TABLES LIKE '{table_slice.table}' IN SCHEMA"
f" {table_slice.database}.{table_slice.schema}"
).fetchall()
return len(tables) > 0


def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]:
if _table_exists(table_slice, connection):
schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall()
return {item[0]: item[1] for item in schema_list}


def _convert_timestamp_to_string(
s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str
) -> pd.Series:
"""Converts columns of data of type pd.Timestamp to string so that it can be stored in
snowflake.
"""
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if column_types:
if "VARCHAR" not in column_types[column_name]:
raise DagsterInvariantViolationError(
"Snowflake I/O manager: Snowflake I/O manager configured to convert time data"
f" in DataFrame column {column_name} to strings, but the corresponding"
f" {column_name.upper()} column in table {table_name} is not of type VARCHAR,"
f" it is of type {column_types[column_name]}. Please set"
" store_timestamps_as_strings=False in the Snowflake I/O manager configuration"
" to store time data as TIMESTAMP types."
)
return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z")
else:
return s
Expand All @@ -36,6 +64,23 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series:
return s


def _add_missing_timezone(
s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str
) -> pd.Series:
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if column_types:
if "VARCHAR" in column_types[column_name]:
raise DagsterInvariantViolationError(
f"Snowflake I/O manager: The Snowflake column {column_name.upper()} in table"
f" {table_name} is of type {column_types[column_name]} and should be of type"
f" TIMESTAMP to store the time data in dataframe column {column_name}. Please"
" migrate this column to be of time TIMESTAMP_NTZ(9) to store time data."
)
return s.dt.tz_localize("UTC")
return s


class SnowflakePandasTypeHandler(DbTypeHandler[pd.DataFrame]):
"""Plugin for the Snowflake I/O Manager that can store and load Pandas DataFrames as Snowflake tables.

Expand Down Expand Up @@ -73,7 +118,18 @@ def handle_output(

connector.paramstyle = "pyformat"
with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns")
with_uppercase_cols = with_uppercase_cols.apply(_convert_timestamp_to_string, axis="index")
column_types = _get_table_column_types(table_slice, connection)
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
with_uppercase_cols = with_uppercase_cols.apply(
lambda x: _convert_timestamp_to_string(x, column_types, table_slice.table),
axis="index",
)
else:
with_uppercase_cols = with_uppercase_cols.apply(
lambda x: _add_missing_timezone(x, column_types, table_slice.table), axis="index"
)
with_uppercase_cols.to_sql(
table_slice.table,
con=connection.engine,
Expand Down Expand Up @@ -102,7 +158,10 @@ def load_input(
result = pd.read_sql(
sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection
)
result = result.apply(_convert_string_to_timestamp, axis="index")
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
result = result.apply(_convert_string_to_timestamp, axis="index")
result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
materialize,
op,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.resources import SnowflakeConnection
Expand All @@ -40,6 +41,7 @@
snowflake_pandas_io_manager,
)
from dagster_snowflake_pandas.snowflake_pandas_type_handler import (
_add_missing_timezone,
_convert_string_to_timestamp,
_convert_timestamp_to_string,
)
Expand Down Expand Up @@ -93,7 +95,9 @@ def test_handle_output():
handler = SnowflakePandasTypeHandler()
connection = MagicMock()
df = DataFrame([{"col1": "a", "col2": 1}])
output_context = build_output_context(resource_config=resource_config)
output_context = build_output_context(
resource_config={**resource_config, "time_data_to_string": False}
)

metadata = handler.handle_output(
output_context,
Expand Down Expand Up @@ -124,7 +128,9 @@ def test_load_input():
mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}])

handler = SnowflakePandasTypeHandler()
input_context = build_input_context()
input_context = build_input_context(
resource_config={**resource_config, "time_data_to_string": False}
)
df = handler.load_input(
input_context,
TableSlice(
Expand All @@ -143,8 +149,7 @@ def test_load_input():
def test_type_conversions():
# no timestamp data
no_time = pandas.Series([1, 2, 3, 4, 5])
converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time))

converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None, "foo"))
assert (converted == no_time).all()

# timestamp data
Expand All @@ -155,7 +160,9 @@ def test_type_conversions():
pandas.Timestamp("2017-03-01T12:30:45.35"),
]
)
time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time))
time_converted = _convert_string_to_timestamp(
_convert_timestamp_to_string(with_time, None, "foo")
)

assert (with_time == time_converted).all()

Expand All @@ -165,6 +172,25 @@ def test_type_conversions():
assert (_convert_string_to_timestamp(string_data) == string_data).all()


def test_timezone_conversions():
# no timestamp data
no_time = pandas.Series([1, 2, 3, 4, 5])
converted = _add_missing_timezone(no_time, None, "foo")
assert (converted == no_time).all()

# timestamp data
with_time = pandas.Series(
[
pandas.Timestamp("2017-01-01T12:30:45.35"),
pandas.Timestamp("2017-02-01T12:30:45.35"),
pandas.Timestamp("2017-03-01T12:30:45.35"),
]
)
time_converted = _add_missing_timezone(with_time, None, "foo")

assert (with_time.dt.tz_localize("UTC") == time_converted).all()


def test_build_snowflake_pandas_io_manager():
assert isinstance(
build_snowflake_io_manager([SnowflakePandasTypeHandler()]), IOManagerDefinition
Expand Down Expand Up @@ -206,7 +232,7 @@ def io_manager_test_pipeline():

@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.parametrize(
"io_manager", [(old_snowflake_io_manager), (pythonic_snowflake_io_manager)]
"io_manager", [(snowflake_pandas_io_manager), (SnowflakePandasIOManager.configure_at_launch())]
)
def test_io_manager_with_snowflake_pandas_timestamp_data(io_manager):
with temporary_snowflake_table(
Expand Down Expand Up @@ -234,13 +260,43 @@ def read_time_df(df: pandas.DataFrame):

@job(
resource_defs={"snowflake": io_manager},
config={
"resources": {
"snowflake": {
"config": {**SHARED_BUILDKITE_SNOWFLAKE_CONF, "database": DATABASE}
}
}
},
)
def io_manager_timestamp_test_job():
read_time_df(emit_time_df())

res = io_manager_timestamp_test_job.execute_in_process()
assert res.success

@job(
resource_defs={"snowflake": io_manager},
config={
"resources": {
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": DATABASE,
"store_timestamps_as_strings": True,
}
}
}
},
)
def io_manager_timestamp_as_string_test_job():
read_time_df(emit_time_df())

with pytest.raises(
DagsterInvariantViolationError,
match=r"is not of type VARCHAR, it is of type TIMESTAMP_NTZ\(9\)",
):
io_manager_timestamp_as_string_test_job.execute_in_process()


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
" https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details."
),
)
store_timestamps_as_strings: bool = Field(
default=False,
description=(
"If using Pandas DataFrames, whether to convert time data to strings. If True, time"
" data will be converted to strings when storing the DataFrame and converted back to"
" time data when loading the DataFrame. If False, time data without a timezone will be"
" set to UTC timezone to avoid a Snowflake bug. Defaults to False."
),
)

@staticmethod
@abstractmethod
Expand Down