From a6566f63cc1d1907904666c9d115a22cc4187ff3 Mon Sep 17 00:00:00 2001 From: edward Date: Thu, 22 Apr 2021 17:15:23 -0400 Subject: [PATCH 1/3] Fixes datamill-co/target-snowflake#21 --- target_snowflake/snowflake.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/target_snowflake/snowflake.py b/target_snowflake/snowflake.py index 93250e6..31a0fa6 100644 --- a/target_snowflake/snowflake.py +++ b/target_snowflake/snowflake.py @@ -288,7 +288,7 @@ def add_table_mapping(self, cur, from_path, metadata): def serialize_table_record_null_value(self, remote_schema, streamed_schema, field, value): if value is None: - return '\\\\N' + return '\\N' return value def serialize_table_record_datetime_value(self, remote_schema, streamed_schema, field, value): @@ -521,12 +521,15 @@ def write_table_batch(self, cur, table_batch, metadata): csv_headers = list(remote_schema['schema']['properties'].keys()) rows_iter = iter(table_batch['records']) + csv_dialect = csv.unix_dialect() + csv_dialect.escapechar = '\\' + def transform(): try: row = next(rows_iter) with io.StringIO() as out: - writer = csv.DictWriter(out, csv_headers) + writer = csv.DictWriter(out, csv_headers, dialect=csv_dialect) writer.writerow(row) return out.getvalue() except StopIteration: From e3590d7b2dd0c0b18cb0a24dc97672f0942009dd Mon Sep 17 00:00:00 2001 From: Fran Lozano Date: Fri, 24 Sep 2021 16:25:36 +0200 Subject: [PATCH 2/3] Fix function name --- target_snowflake/snowflake.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_snowflake/snowflake.py b/target_snowflake/snowflake.py index 31a0fa6..918050a 100644 --- a/target_snowflake/snowflake.py +++ b/target_snowflake/snowflake.py @@ -586,8 +586,8 @@ def make_column_nullable(self, cur, table_name, column_name): '''.format( database=sql.identifier(self.connection.configured_database), table_schema=sql.identifier(self.connection.configured_schema), - table_name=sql.Identifier(table_name), - column_name=sql.Identifier(column_name))) + table_name=sql.identifier(table_name), + column_name=sql.identifier(column_name))) def _set_table_metadata(self, cur, table_name, metadata): """ From ad8133450d04828e90a6fb41fc0953b224ac16cd Mon Sep 17 00:00:00 2001 From: Nicholas Smith Date: Thu, 9 Jun 2022 13:18:35 -0400 Subject: [PATCH 3/3] Get performance optimizations from underlying target-postgres package by bumping its package dependency. Avoid writing table batch when record count is zero. --- setup.py | 2 +- target_snowflake/snowflake.py | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index f3eaa00..3247731 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ py_modules=['target_snowflake'], install_requires=[ 'singer-python==5.9.0', - 'singer-target-postgres==0.2.4', + 'singer-target-postgres@git+https://github.com/datamill-co/target-postgres.git#279cb62d2a80b1bd8e8ab1191e7a1d17c19383a8', 'target-redshift==0.2.4', 'botocore<1.13.0,>=1.12.253', 'snowflake-connector-python==2.2.5' diff --git a/target_snowflake/snowflake.py b/target_snowflake/snowflake.py index 918050a..0d532e5 100644 --- a/target_snowflake/snowflake.py +++ b/target_snowflake/snowflake.py @@ -6,6 +6,7 @@ import os import re import uuid +from functools import lru_cache import arrow from psycopg2 import sql @@ -21,6 +22,16 @@ from target_snowflake.connection import connect from target_snowflake.exceptions import SnowflakeError +# copied in from optimization in PostgresTarget: https://github.com/datamill-co/target-postgres/commit/6a3da026d2bb4681fdf46bd7ca69fbb164489d8a +@lru_cache(maxsize=128) +def _format_datetime(value): + """ + Format a datetime value. This is only called from the + SnowflakeTarget.serialize_table_record_datetime_value + but this non-method version allows caching + """ + return arrow.get(value).format('YYYY-MM-DD HH:mm:ss.SSSSZZ') + class SnowflakeTarget(SQLInterface): """ Specific Snowflake implementation of a Singer Target. @@ -288,11 +299,11 @@ def add_table_mapping(self, cur, from_path, metadata): def serialize_table_record_null_value(self, remote_schema, streamed_schema, field, value): if value is None: - return '\\N' + return '\\\\N' return value def serialize_table_record_datetime_value(self, remote_schema, streamed_schema, field, value): - return arrow.get(value).format('YYYY-MM-DD HH:mm:ss.SSSSZZ') + return _format_datetime(value) def perform_update(self, cur, target_table_name, temp_table_name, key_properties, columns, subkeys): full_table_name = '{}.{}.{}'.format( @@ -503,8 +514,11 @@ def persist_csv_rows(self, subkeys) def write_table_batch(self, cur, table_batch, metadata): - remote_schema = table_batch['remote_schema'] + record_count = len(table_batch['records']) + if record_count == 0: + return 0 + remote_schema = table_batch['remote_schema'] ## Create temp table to upload new data to target_table_name = self.canonicalize_identifier('tmp_' + str(uuid.uuid4())) @@ -544,7 +558,7 @@ def transform(): csv_headers, csv_rows) - return len(table_batch['records']) + return record_count def add_column(self, cur, table_name, column_name, column_schema):