diff --git a/setup.py b/setup.py index f3eaa00..3247731 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ py_modules=['target_snowflake'], install_requires=[ 'singer-python==5.9.0', - 'singer-target-postgres==0.2.4', + 'singer-target-postgres@git+https://github.com/datamill-co/target-postgres.git#279cb62d2a80b1bd8e8ab1191e7a1d17c19383a8', 'target-redshift==0.2.4', 'botocore<1.13.0,>=1.12.253', 'snowflake-connector-python==2.2.5' diff --git a/target_snowflake/snowflake.py b/target_snowflake/snowflake.py index 93250e6..0d532e5 100644 --- a/target_snowflake/snowflake.py +++ b/target_snowflake/snowflake.py @@ -6,6 +6,7 @@ import os import re import uuid +from functools import lru_cache import arrow from psycopg2 import sql @@ -21,6 +22,16 @@ from target_snowflake.connection import connect from target_snowflake.exceptions import SnowflakeError +# copied in from optimization in PostgresTarget: https://github.com/datamill-co/target-postgres/commit/6a3da026d2bb4681fdf46bd7ca69fbb164489d8a +@lru_cache(maxsize=128) +def _format_datetime(value): + """ + Format a datetime value. This is only called from the + SnowflakeTarget.serialize_table_record_datetime_value + but this non-method version allows caching + """ + return arrow.get(value).format('YYYY-MM-DD HH:mm:ss.SSSSZZ') + class SnowflakeTarget(SQLInterface): """ Specific Snowflake implementation of a Singer Target. @@ -292,7 +303,7 @@ def serialize_table_record_null_value(self, remote_schema, streamed_schema, fiel return value def serialize_table_record_datetime_value(self, remote_schema, streamed_schema, field, value): - return arrow.get(value).format('YYYY-MM-DD HH:mm:ss.SSSSZZ') + return _format_datetime(value) def perform_update(self, cur, target_table_name, temp_table_name, key_properties, columns, subkeys): full_table_name = '{}.{}.{}'.format( @@ -503,8 +514,11 @@ def persist_csv_rows(self, subkeys) def write_table_batch(self, cur, table_batch, metadata): - remote_schema = table_batch['remote_schema'] + record_count = len(table_batch['records']) + if record_count == 0: + return 0 + remote_schema = table_batch['remote_schema'] ## Create temp table to upload new data to target_table_name = self.canonicalize_identifier('tmp_' + str(uuid.uuid4())) @@ -521,12 +535,15 @@ def write_table_batch(self, cur, table_batch, metadata): csv_headers = list(remote_schema['schema']['properties'].keys()) rows_iter = iter(table_batch['records']) + csv_dialect = csv.unix_dialect() + csv_dialect.escapechar = '\\' + def transform(): try: row = next(rows_iter) with io.StringIO() as out: - writer = csv.DictWriter(out, csv_headers) + writer = csv.DictWriter(out, csv_headers, dialect=csv_dialect) writer.writerow(row) return out.getvalue() except StopIteration: @@ -541,7 +558,7 @@ def transform(): csv_headers, csv_rows) - return len(table_batch['records']) + return record_count def add_column(self, cur, table_name, column_name, column_schema): @@ -583,8 +600,8 @@ def make_column_nullable(self, cur, table_name, column_name): '''.format( database=sql.identifier(self.connection.configured_database), table_schema=sql.identifier(self.connection.configured_schema), - table_name=sql.Identifier(table_name), - column_name=sql.Identifier(column_name))) + table_name=sql.identifier(table_name), + column_name=sql.identifier(column_name))) def _set_table_metadata(self, cur, table_name, metadata): """