Skip to content

Commit

Permalink
[dagster-snowflake] config to determine how to handle timestamp data (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored and takeru911 committed Apr 29, 2023
1 parent f976e2b commit 3b3508c
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 9 deletions.
30 changes: 30 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,36 @@

When new releases include breaking changes or deprecations, this document describes how to migrate.

## Migrating to 1.3.0


### Breaking Changes

#### Extension Libraries
[dagster-snowflake-pandas] Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager converted all timestamp data to strings before loading the data in Snowflake, and did the opposite conversion when fetching a DataFrame from Snowflake. The I/O manager now ensures timestamp data has a timezone attached and stores the data as TIMESTAMP_NTZ(9) type. If you used the Snowflake I/O manager prior to version `0.19.0` you can set the `store_timestamps_as_strings=True` configuration value for the Snowflake I/O manager to continue storing time data as strings while you do table migrations.

To migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`:

```sql

// Add a column of type TIMESTAMP_NTZ(9)
ALTER TABLE database.schema.table
ADD COLUMN time_copy TIMESTAMP_NTZ(9)

// copy the data from time and convert to timestamp data
UPDATE database.schema.table
SET time_copy = to_timestamp_ntz(time)

// drop the time column
ALTER TABLE database.schema.table
DROP COLUMN time

// rename the time_copy column to time
ALTER TABLER database.schema.table
RENAME COLUMN time_copy TO time

```

## Migrating to 1.2.0

### Database migration
Expand Down
16 changes: 16 additions & 0 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This reference page provides information for working with [`dagster-snowflake`](
- [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset)
- [Storing partitioned assets](#storing-partitioned-assets)
- [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas)
- [Storing timestamp data in Pandas DataFrames](#storing-timestamp-data-in-pandas-dataframes)
- [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers)
- [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake)
- [Using Pandas and PySpark DataFrames with Snowflake](#using-pandas-and-pyspark-dataframes-with-snowflake)
Expand Down Expand Up @@ -330,6 +331,21 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a

---

## Storing timestamp data in Pandas DataFrames

Due to a longstanding [issue](https://github.com/snowflakedb/snowflake-connector-python/issues/319) with the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. In order to store timestamp data properly, it must have a timezone attached. When storing a Pandas DataFrame with the Snowflake I/O manager, the I/O manager will check if timestamp data has a timezone attached, and if not, **it will assign the UTC timezone**. In Snowflake, you will see the timestamp data stored as the TIMESTAMP_NTZ(9) type, as this is the type assigned by the Snowflake Pandas connector.

<Note>
Prior to `dagster-snowflake` version `0.19.0` the Snowflake I/O manager
converted all timestamp data to strings before loading the data in Snowflake,
and did the opposite conversion when fetching a DataFrame from Snowflake. If
you have used a version of `dagster-snowflake` prior to version `0.19.0`
please see the <a href="/migration#extension-libraries">Migration Guide</a>{" "}
for information about migrating you database tables.
</Note>

---

## Using the Snowflake I/O manager with other I/O managers

You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the `asset` decorator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,45 @@
import pandas.core.dtypes.common as pd_core_dtypes_common
from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema
from dagster._core.definitions.metadata import RawMetadataValue
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient, SnowflakeIOManager
from snowflake.connector.pandas_tools import pd_writer


def _convert_timestamp_to_string(s: pd.Series) -> pd.Series:
def _table_exists(table_slice: TableSlice, connection):
tables = connection.execute(
f"SHOW TABLES LIKE '{table_slice.table}' IN SCHEMA"
f" {table_slice.database}.{table_slice.schema}"
).fetchall()
return len(tables) > 0


def _get_table_column_types(table_slice: TableSlice, connection) -> Optional[Mapping[str, str]]:
if _table_exists(table_slice, connection):
schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall()
return {item[0]: item[1] for item in schema_list}


def _convert_timestamp_to_string(
s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str
) -> pd.Series:
"""Converts columns of data of type pd.Timestamp to string so that it can be stored in
snowflake.
"""
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if column_types:
if "VARCHAR" not in column_types[column_name]:
raise DagsterInvariantViolationError(
"Snowflake I/O manager: Snowflake I/O manager configured to convert time data"
f" in DataFrame column {column_name} to strings, but the corresponding"
f" {column_name.upper()} column in table {table_name} is not of type VARCHAR,"
f" it is of type {column_types[column_name]}. Please set"
" store_timestamps_as_strings=False in the Snowflake I/O manager configuration"
" to store time data as TIMESTAMP types."
)
return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z")
else:
return s
Expand All @@ -36,6 +64,23 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series:
return s


def _add_missing_timezone(
s: pd.Series, column_types: Optional[Mapping[str, str]], table_name: str
) -> pd.Series:
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if column_types:
if "VARCHAR" in column_types[column_name]:
raise DagsterInvariantViolationError(
f"Snowflake I/O manager: The Snowflake column {column_name.upper()} in table"
f" {table_name} is of type {column_types[column_name]} and should be of type"
f" TIMESTAMP to store the time data in dataframe column {column_name}. Please"
" migrate this column to be of time TIMESTAMP_NTZ(9) to store time data."
)
return s.dt.tz_localize("UTC")
return s


class SnowflakePandasTypeHandler(DbTypeHandler[pd.DataFrame]):
"""Plugin for the Snowflake I/O Manager that can store and load Pandas DataFrames as Snowflake tables.
Expand Down Expand Up @@ -73,7 +118,18 @@ def handle_output(

connector.paramstyle = "pyformat"
with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns")
with_uppercase_cols = with_uppercase_cols.apply(_convert_timestamp_to_string, axis="index")
column_types = _get_table_column_types(table_slice, connection)
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
with_uppercase_cols = with_uppercase_cols.apply(
lambda x: _convert_timestamp_to_string(x, column_types, table_slice.table),
axis="index",
)
else:
with_uppercase_cols = with_uppercase_cols.apply(
lambda x: _add_missing_timezone(x, column_types, table_slice.table), axis="index"
)
with_uppercase_cols.to_sql(
table_slice.table,
con=connection.engine,
Expand Down Expand Up @@ -102,7 +158,10 @@ def load_input(
result = pd.read_sql(
sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection
)
result = result.apply(_convert_string_to_timestamp, axis="index")
if context.resource_config and context.resource_config.get(
"store_timestamps_as_strings", False
):
result = result.apply(_convert_string_to_timestamp, axis="index")
result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
materialize,
op,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.resources import SnowflakeConnection
Expand All @@ -40,6 +41,7 @@
snowflake_pandas_io_manager,
)
from dagster_snowflake_pandas.snowflake_pandas_type_handler import (
_add_missing_timezone,
_convert_string_to_timestamp,
_convert_timestamp_to_string,
)
Expand Down Expand Up @@ -93,7 +95,9 @@ def test_handle_output():
handler = SnowflakePandasTypeHandler()
connection = MagicMock()
df = DataFrame([{"col1": "a", "col2": 1}])
output_context = build_output_context(resource_config=resource_config)
output_context = build_output_context(
resource_config={**resource_config, "time_data_to_string": False}
)

metadata = handler.handle_output(
output_context,
Expand Down Expand Up @@ -124,7 +128,9 @@ def test_load_input():
mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}])

handler = SnowflakePandasTypeHandler()
input_context = build_input_context()
input_context = build_input_context(
resource_config={**resource_config, "time_data_to_string": False}
)
df = handler.load_input(
input_context,
TableSlice(
Expand All @@ -143,8 +149,7 @@ def test_load_input():
def test_type_conversions():
# no timestamp data
no_time = pandas.Series([1, 2, 3, 4, 5])
converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time))

converted = _convert_string_to_timestamp(_convert_timestamp_to_string(no_time, None, "foo"))
assert (converted == no_time).all()

# timestamp data
Expand All @@ -155,7 +160,9 @@ def test_type_conversions():
pandas.Timestamp("2017-03-01T12:30:45.35"),
]
)
time_converted = _convert_string_to_timestamp(_convert_timestamp_to_string(with_time))
time_converted = _convert_string_to_timestamp(
_convert_timestamp_to_string(with_time, None, "foo")
)

assert (with_time == time_converted).all()

Expand All @@ -165,6 +172,25 @@ def test_type_conversions():
assert (_convert_string_to_timestamp(string_data) == string_data).all()


def test_timezone_conversions():
# no timestamp data
no_time = pandas.Series([1, 2, 3, 4, 5])
converted = _add_missing_timezone(no_time, None, "foo")
assert (converted == no_time).all()

# timestamp data
with_time = pandas.Series(
[
pandas.Timestamp("2017-01-01T12:30:45.35"),
pandas.Timestamp("2017-02-01T12:30:45.35"),
pandas.Timestamp("2017-03-01T12:30:45.35"),
]
)
time_converted = _add_missing_timezone(with_time, None, "foo")

assert (with_time.dt.tz_localize("UTC") == time_converted).all()


def test_build_snowflake_pandas_io_manager():
assert isinstance(
build_snowflake_io_manager([SnowflakePandasTypeHandler()]), IOManagerDefinition
Expand Down Expand Up @@ -206,7 +232,7 @@ def io_manager_test_pipeline():

@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.parametrize(
"io_manager", [(old_snowflake_io_manager), (pythonic_snowflake_io_manager)]
"io_manager", [(snowflake_pandas_io_manager), (SnowflakePandasIOManager.configure_at_launch())]
)
def test_io_manager_with_snowflake_pandas_timestamp_data(io_manager):
with temporary_snowflake_table(
Expand Down Expand Up @@ -234,13 +260,43 @@ def read_time_df(df: pandas.DataFrame):

@job(
resource_defs={"snowflake": io_manager},
config={
"resources": {
"snowflake": {
"config": {**SHARED_BUILDKITE_SNOWFLAKE_CONF, "database": DATABASE}
}
}
},
)
def io_manager_timestamp_test_job():
read_time_df(emit_time_df())

res = io_manager_timestamp_test_job.execute_in_process()
assert res.success

@job(
resource_defs={"snowflake": io_manager},
config={
"resources": {
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": DATABASE,
"store_timestamps_as_strings": True,
}
}
}
},
)
def io_manager_timestamp_as_string_test_job():
read_time_df(emit_time_df())

with pytest.raises(
DagsterInvariantViolationError,
match=r"is not of type VARCHAR, it is of type TIMESTAMP_NTZ\(9\)",
):
io_manager_timestamp_as_string_test_job.execute_in_process()


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
" https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details."
),
)
store_timestamps_as_strings: bool = Field(
default=False,
description=(
"If using Pandas DataFrames, whether to convert time data to strings. If True, time"
" data will be converted to strings when storing the DataFrame and converted back to"
" time data when loading the DataFrame. If False, time data without a timezone will be"
" set to UTC timezone to avoid a Snowflake bug. Defaults to False."
),
)

@staticmethod
@abstractmethod
Expand Down

0 comments on commit 3b3508c

Please sign in to comment.