Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Mar 27, 2023
1 parent 74458b4 commit 4391816
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ def _get_table_column_types(table_slice: TableSlice, connection):
return {item[0]: item[1] for item in schema_list}


def _convert_timestamp_to_string(s: pd.Series, column_types) -> pd.Series:
def _convert_timestamp_to_string(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series:
"""Converts columns of data of type pd.Timestamp to string so that it can be stored in
snowflake.
"""
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if column_types:
if "VARCHAR" not in column_types[s.name]:
if "VARCHAR" not in column_types[column_name]:
raise DagsterInvariantViolationError(
"Snowflake I/O manager configured to convert time data to strings, but the"
" corresponding column is not of type VARCHAR, it is of type"
f" {column_types[s.name]}. Please set store_timestamps_as_strings=False in the"
" Snowflake I/O manager configuration to store time data as TIMESTAMP types."
f" {column_types[column_name]}. Please set store_timestamps_as_strings=False in"
" the Snowflake I/O manager configuration to store time data as TIMESTAMP"
" types."
)
return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z")
else:
Expand All @@ -59,15 +61,16 @@ def _convert_string_to_timestamp(s: pd.Series) -> pd.Series:
return s


def _add_missing_timezone(s: pd.Series, column_types) -> pd.Series:
def _add_missing_timezone(s: pd.Series, column_types: Mapping[str, str]) -> pd.Series:
column_name = str(s.name)
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s):
if column_types:
if "VARCHAR" in column_types[s.name]:
if "VARCHAR" in column_types[column_name]:
raise DagsterInvariantViolationError(
f"Snowflake I/O manager: The Snowflake column for {s.name} is of type"
f" {column_types[s.name]} and should be of type TIMESTAMP to store time data."
" Please migrate this column to be of time TIMESTAMP_NTZ(9) to store time"
" data."
f" {column_types[column_name]} and should be of type TIMESTAMP to store time"
" data. Please migrate this column to be of time TIMESTAMP_NTZ(9) to store"
" time data."
)
return s.dt.tz_localize("UTC")
return s
Expand Down

0 comments on commit 4391816

Please sign in to comment.