Skip to content

Commit

Permalink
Merge pull request #15 from cody-scott/logging-updates
Browse files Browse the repository at this point in the history
Logging updates and regression fix
  • Loading branch information
cody-scott authored Nov 5, 2024
2 parents 245ffaa + d34a2b5 commit 9db6357
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
7 changes: 5 additions & 2 deletions dagster_mssql_bcp_tests/bcp_polars/test_bcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,16 @@ def test_process_datetime(self, polars_io: polars_mssql_bcp.PolarsBCP):
schema = polars_mssql_bcp.AssetSchema(
[
{'name': 'a', 'type': 'DATETIME2'},
{'name': 'b', 'type': 'DATETIME2'}
]
)

input = pl.datetime_range(datetime.datetime(2021,1,1), datetime.datetime(2021,1,3), eager=True).alias('a').to_frame().lazy()
input = pl.datetime_range(datetime.datetime(2021,1,1), datetime.datetime(2021,1,3), eager=True).alias('a').to_frame()
input = input.with_columns(pl.lit(None).alias('b'))
input = input.lazy()
df = polars_io._process_datetime(input, schema).collect()
expected = pl.DataFrame(
{'a': ["2021-01-01 00:00:00+00:00", "2021-01-02 00:00:00+00:00", "2021-01-03 00:00:00+00:00"]}
{'a': ["2021-01-01 00:00:00+00:00", "2021-01-02 00:00:00+00:00", "2021-01-03 00:00:00+00:00"], 'b': [None, None, None]}
)
pl_testing.assert_frame_equal(df, expected)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "dagster-mssql-bcp"
version = "0.0.9"
version = "0.0.10"
dependencies = [
"dagster",

Expand Down
8 changes: 5 additions & 3 deletions src/dagster_mssql_bcp/bcp_core/bcp_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ def load_bcp(
)

with connect_mssql(self.connection_config) as connection:
get_dagster_logger().debug('pre-bcp stage')
data, schema_deltas = self._pre_bcp_stage(
connection=connection,
data=data,
Expand Down Expand Up @@ -271,7 +270,7 @@ def _pre_bcp_stage(
process_replacements,
staging_table,
):

get_dagster_logger().info('preprocessing stage')
data = self._pre_prcessing_start_hook(data)
self._create_target_tables(
schema, table, asset_schema, staging_table, connection
Expand Down Expand Up @@ -348,6 +347,7 @@ def _bcp_stage(self, data, schema, staging_table):
temp_dir = Path(temp_dir)
format_file = temp_dir / f"{staging_table}_format_file.fmt"
error_file = temp_dir / f"{staging_table}_error_file.err"
get_dagster_logger().info('saving data to csv')
csv_file = self._save_csv(data, temp_dir, f"{staging_table}.csv")

self._generate_format_file(schema, staging_table, format_file)
Expand Down Expand Up @@ -998,7 +998,7 @@ def _insert_and_drop_bcp_table(
schema_with_cast = asset_schema.get_sql_columns_with_cast()
schema_with_cast_str = ", ".join(schema_with_cast)

get_dagster_logger().debug("Inserting data and dropping BCP table")
get_dagster_logger().debug("Inserting data")
insert_sql = f"""
INSERT INTO {schema}.{table} ({schema_columns_str})
SELECT {schema_with_cast_str} FROM {schema}.{bcp_table}
Expand All @@ -1013,6 +1013,7 @@ def _insert_and_drop_bcp_table(
)
)
)
get_dagster_logger().debug("dropping BCP table")
connection.execute(text(f"DROP TABLE {schema}.{bcp_table}"))

def _calculate_row_hash(
Expand All @@ -1030,6 +1031,7 @@ def _calculate_row_hash(
Returns:
None
"""
get_dagster_logger().info('Calculating row hash')
col_sql = [
f"COALESCE(CAST({column} AS NVARCHAR(MAX)), '')" for column in columns
]
Expand Down
10 changes: 6 additions & 4 deletions src/dagster_mssql_bcp/bcp_polars/polars_mssql_bcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ def _process_datetime(
dt_columns = (
data.select(cs.datetime(), cs.date(), cs.time()).collect_schema().names()
)
null_columns = data.select(cs.by_dtype(pl.Null)).collect_schema().names()

data = data.with_columns(
[
pl.col(_).str.to_datetime()
for _ in asset_schema.get_datetime_columns()
if _ not in dt_columns
pl.col(col).str.to_datetime()
for col in asset_schema.get_datetime_columns()
if col not in dt_columns and col not in null_columns
]
)

Expand All @@ -135,7 +136,7 @@ def _process_datetime(
[
pl.col(_).dt.convert_time_zone("UTC")
for _ in asset_schema.get_datetime_columns()
if _ not in dt_columns_in_tz
if _ not in dt_columns_in_tz and _ not in null_columns
]
)

Expand All @@ -146,6 +147,7 @@ def _process_datetime(
.str.replace("Z", "+00:00")
.str.replace("T", " ")
for _ in asset_schema.get_datetime_columns()
if _ not in null_columns
]
)
return data
Expand Down

0 comments on commit 9db6357

Please sign in to comment.