diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 70e6b80ba2..eb448d4266 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ from datetime import datetime # noqa: I251 -from typing import Any, Optional, Set, Tuple, List +from typing import Any, Optional, Set, Tuple, List, Type from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import digest128 @@ -122,54 +122,36 @@ def find_cursor_value(self, row: TDataItem) -> Any: Will use compiled JSONPath if present. Otherwise, reverts to field access if row is dict, Pydantic model, or of other class. """ + key_exc: Type[Exception] = IncrementalCursorPathHasValueNone if self._compiled_cursor_path: - cursor_values = find_values(self._compiled_cursor_path, row) - if cursor_values == []: - if self.on_cursor_value_missing == "raise": - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_missing == "exclude": - return None - elif None in cursor_values: - if self.on_cursor_value_missing == "raise": - raise IncrementalCursorPathHasValueNone( - self.resource_name, self.cursor_path, row - ) - elif self.on_cursor_value_missing == "exclude": - return None - # ignores the other found values, e.g. when the path is $data.items[*].created_at - row_value = cursor_values[0] + try: + row_value = find_values(self._compiled_cursor_path, row)[0] + except IndexError: + # empty list so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing else: - row_value = self._value_at_cursor_path(row) - if row_value is None: - if self.on_cursor_value_missing == "raise": - has_field = self._contains_cursor_path(row) - if not has_field: - raise IncrementalCursorPathMissing( - self.resource_name, self.cursor_path, row - ) - else: - raise IncrementalCursorPathHasValueNone( - self.resource_name, self.cursor_path, row - ) - elif self.on_cursor_value_missing == "exclude": - return None - - return row_value - - def _value_at_cursor_path(self, row: TDataItem) -> Optional[TDataItem]: - try: - return row.get(self.cursor_path) - except AttributeError: - # supports Pydantic models and other classes - return getattr(row, self.cursor_path) + try: + try: + row_value = row[self.cursor_path] + except TypeError: + # supports Pydantic models and other classes + row_value = getattr(row, self.cursor_path) + except (KeyError, AttributeError): + # attr not found so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing + + # if we have a value - return it + if row_value is not None: + return row_value - def _contains_cursor_path(self, row: TDataItem) -> bool: - try: - return self.cursor_path in row.keys() - except AttributeError: - # supports Pydantic models and other classes - return hasattr(row, self.cursor_path) + if self.on_cursor_value_missing == "raise": + # raise missing path or None value exception + raise key_exc(self.resource_name, self.cursor_path, row) + elif self.on_cursor_value_missing == "exclude": + return None def __call__( self, diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index c757959bec..af3a6c239e 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -246,7 +246,6 @@ class TestRow(BaseModel): example_string: str # yield model in resource so incremental fails when looking for "id" - # TODO: support pydantic models in incremental @dlt.resource(name="table_name", primary_key="id", write_disposition="replace") def generate_rows_incremental(