Skip to content

Commit

Permalink
handles non existing attr when gettin row_value, simplifies getter code
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Aug 30, 2024
1 parent 4a22f34 commit 4ce384a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 46 deletions.
72 changes: 27 additions & 45 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime # noqa: I251
from typing import Any, Optional, Set, Tuple, List
from typing import Any, Optional, Set, Tuple, List, Type

from dlt.common.exceptions import MissingDependencyException
from dlt.common.utils import digest128
Expand Down Expand Up @@ -122,54 +122,36 @@ def find_cursor_value(self, row: TDataItem) -> Any:
Will use compiled JSONPath if present.
Otherwise, reverts to field access if row is dict, Pydantic model, or of other class.
"""
key_exc: Type[Exception] = IncrementalCursorPathHasValueNone
if self._compiled_cursor_path:
cursor_values = find_values(self._compiled_cursor_path, row)
if cursor_values == []:
if self.on_cursor_value_missing == "raise":
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
elif self.on_cursor_value_missing == "exclude":
return None
elif None in cursor_values:
if self.on_cursor_value_missing == "raise":
raise IncrementalCursorPathHasValueNone(
self.resource_name, self.cursor_path, row
)
elif self.on_cursor_value_missing == "exclude":
return None

# ignores the other found values, e.g. when the path is $data.items[*].created_at
row_value = cursor_values[0]
try:
row_value = find_values(self._compiled_cursor_path, row)[0]
except IndexError:
# empty list so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing
else:
row_value = self._value_at_cursor_path(row)
if row_value is None:
if self.on_cursor_value_missing == "raise":
has_field = self._contains_cursor_path(row)
if not has_field:
raise IncrementalCursorPathMissing(
self.resource_name, self.cursor_path, row
)
else:
raise IncrementalCursorPathHasValueNone(
self.resource_name, self.cursor_path, row
)
elif self.on_cursor_value_missing == "exclude":
return None

return row_value

def _value_at_cursor_path(self, row: TDataItem) -> Optional[TDataItem]:
try:
return row.get(self.cursor_path)
except AttributeError:
# supports Pydantic models and other classes
return getattr(row, self.cursor_path)
try:
try:
row_value = row[self.cursor_path]
except TypeError:
# supports Pydantic models and other classes
row_value = getattr(row, self.cursor_path)
except (KeyError, AttributeError):
# attr not found so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing

# if we have a value - return it
if row_value is not None:
return row_value

def _contains_cursor_path(self, row: TDataItem) -> bool:
try:
return self.cursor_path in row.keys()
except AttributeError:
# supports Pydantic models and other classes
return hasattr(row, self.cursor_path)
if self.on_cursor_value_missing == "raise":
# raise missing path or None value exception
raise key_exc(self.resource_name, self.cursor_path, row)
elif self.on_cursor_value_missing == "exclude":
return None

def __call__(
self,
Expand Down
1 change: 0 additions & 1 deletion tests/pipeline/test_pipeline_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ class TestRow(BaseModel):
example_string: str

# yield model in resource so incremental fails when looking for "id"
# TODO: support pydantic models in incremental

@dlt.resource(name="table_name", primary_key="id", write_disposition="replace")
def generate_rows_incremental(
Expand Down

0 comments on commit 4ce384a

Please sign in to comment.