Skip to content

Commit

Permalink
bugfix: misuse of defaultdict
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas ESTRADA committed Dec 22, 2024
1 parent 0b7c151 commit ec72e36
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
4 changes: 2 additions & 2 deletions sources/pg_legacy_replication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def replication_resource(slot_name: str) -> Iterable[TDataItem]:

for table in table_names:
yield dlt.transformer(
_create_table_dispatch(table, repl_options=repl_options.get(table)),
_create_table_dispatch(table, repl_options=repl_options[table]),
data_from=wal_reader,
name=table,
)
Expand All @@ -137,4 +137,4 @@ def _create_table_dispatch(
"cleanup_snapshot_resources",
"init_replication",
"replication_source",
]
]
11 changes: 6 additions & 5 deletions sources/pg_legacy_replication/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import (
Any,
Callable,
DefaultDict,
Dict,
Iterable,
Iterator,
Expand Down Expand Up @@ -302,7 +303,7 @@ def __init__(
self,
upto_lsn: int,
table_qnames: Set[str],
repl_options: Mapping[str, ReplicationOptions],
repl_options: DefaultDict[str, ReplicationOptions],
target_batch_size: int = 1000,
) -> None:
self.upto_lsn = upto_lsn
Expand Down Expand Up @@ -376,7 +377,7 @@ def process_change(self, msg: RowMessage, lsn: int) -> None:
table_name = msg.table.split(".")[1]
table_schema = self.get_table_schema(msg, table_name)
data_item = gen_data_item(
msg, table_schema["columns"], lsn, **self.repl_options.get(table_name)
msg, table_schema["columns"], lsn, **self.repl_options[table_name]
)
self.data_items[table_name].append(data_item)

Expand All @@ -393,7 +394,7 @@ def get_table_schema(self, msg: RowMessage, table_name: str) -> TTableSchema:
if current_hash == self.last_table_hashes.get(table_name):
return self.last_table_schema[table_name]

new_schema = infer_table_schema(msg, **self.repl_options.get(table_name))
new_schema = infer_table_schema(msg, **self.repl_options[table_name])
if last_schema is None:
# Cache the inferred schema and hash if it is not already cached
self.last_table_schema[table_name] = new_schema
Expand Down Expand Up @@ -430,7 +431,7 @@ class ItemGenerator:
table_qnames: Set[str]
upto_lsn: int
start_lsn: int
repl_options: Mapping[str, ReplicationOptions]
repl_options: DefaultDict[str, ReplicationOptions]
target_batch_size: int = 1000
last_commit_lsn: Optional[int] = field(default=None, init=False)
generated_all: bool = False
Expand Down Expand Up @@ -694,4 +695,4 @@ def compare_schemas(last: TTableSchema, new: TTableSchema) -> TTableSchema:
# Update with the more detailed schema per column
table_schema["columns"][name] = col_schema

return table_schema
return table_schema

0 comments on commit ec72e36

Please sign in to comment.