Skip to content

Commit

Permalink
Allow empty primary keys: fix Google Ads case
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Aug 13, 2018
1 parent 82886ee commit 89c0940
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
5 changes: 3 additions & 2 deletions target_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ def persist_lines(config, lines):
primary_key_string = sync.record_primary_key_string(o['record'])
if stream not in primary_key_exists:
primary_key_exists[stream] = {}
if primary_key_string in primary_key_exists[stream]:
if primary_key_string and primary_key_string in primary_key_exists[stream]:
flush_records(o, csv_files_to_load, row_count, primary_key_exists, sync)

csv_line = sync.record_to_csv_line(o['record'])
csv_files_to_load[o['stream']].write(bytes(csv_line + '\n', 'UTF-8'))
row_count[o['stream']] += 1
primary_key_exists[stream][primary_key_string] = True
if primary_key_string:
primary_key_exists[stream][primary_key_string] = True

if row_count[o['stream']] >= batch_size:
flush_records(o, csv_files_to_load, row_count, primary_key_exists, sync)
Expand Down
17 changes: 15 additions & 2 deletions target_postgres/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def table_name(self, table_name, is_temporary):
return '{}.{}'.format(self.schema_name, table_name)

def record_primary_key_string(self, record):
if len(self.stream_schema_message['key_properties']) == 0:
return None
flatten = flatten_record(record)
key_props = [str(flatten[inflect_column_name(p)]) for p in self.stream_schema_message['key_properties']]
return ','.join(key_props)
Expand Down Expand Up @@ -167,8 +169,9 @@ def load_csv(self, file, count):
copy_sql,
file
)
cur.execute(self.update_from_temp_table())
logger.info(cur.statusmessage)
if len(self.stream_schema_message['key_properties']) > 0:
cur.execute(self.update_from_temp_table())
logger.info(cur.statusmessage)
cur.execute(self.insert_from_temp_table())
logger.info(cur.statusmessage)
cur.execute(self.drop_temp_table())
Expand All @@ -178,6 +181,16 @@ def insert_from_temp_table(self):
columns = self.column_names()
table = self.table_name(stream_schema_message['stream'], False)
temp_table = self.table_name(stream_schema_message['stream'], True)

if len(stream_schema_message['key_properties']) == 0:
return """INSERT INTO {} ({})
(SELECT s.* FROM {} s)
""".format(
table,
', '.join(columns),
temp_table
)

return """INSERT INTO {} ({})
(SELECT s.* FROM {} s LEFT OUTER JOIN {} t ON {} WHERE {})
""".format(
Expand Down

0 comments on commit 89c0940

Please sign in to comment.