Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline crashes when single record has None value at incremental cursor path #1571

Closed
3 tasks done
willi-mueller opened this issue Jul 9, 2024 · 9 comments · Fixed by #1576
Closed
3 tasks done
Assignees
Labels
bug Something isn't working

Comments

@willi-mueller
Copy link
Collaborator

willi-mueller commented Jul 9, 2024

dlt version

0.5.1

Describe the problem

The following data crashes when trying to load it incrementally with the cursor_path="created_at"

    data = [
        {"id": 1, "created_at": 1},
        {"id": 2, "created_at": None},
        {"id": 3, "created_at": 2},
    ]

I could not isolate it yet, but in version 0.4.12 the second row where the created_at is None was under some unknown conditions skipped over and not loaded into the destination.

Expected behavior

  • Allow the user to specify whether the incremental transformer raises an error or accepts the row:
inc_0 = dlt.sources.incremental(cursor_path="updated_at", on_cursor_value_none="include")
inc_1 = dlt.sources.incremental(cursor_path="updated_at", on_cursor_value_none="raise")
  • Documentation how to set a default cursor_path in case the value is None.
  • Clarify with the exception whether the cursor path is missing or the value at the cursor path is None.

Steps to reproduce

See test suite in #1576

Operating system

macOS

Runtime environment

Local

Python version

3.11

@rudolfix
Copy link
Collaborator

@willi-mueller how dlt should behave in this case? We cannot really say if this record is in range or not if cursor column is not provided.

IMO we need nice exception message here explaining what happen, now I bet max function is simply failing on some Python internals

@willi-mueller
Copy link
Collaborator Author

willi-mueller commented Jul 10, 2024

@willi-mueller how dlt should behave in this case? We cannot really say if this record is in range or not if cursor column is not provided.

IMO we need nice exception message here explaining what happen, now I bet max function is simply failing on some Python internals

We already have a nice exception: IncrementalCursorPathMissing in the JsonIncremental. However, this means that people cannot load incrementally tables or data sources that contain None in the cursor field for even one row! Thus, to process dirty data I would need another tool first. That's frustrating.

See this error report: https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1720531715761139

Proposal: The user can pass a callable that gets the row and returns a value for the cursor path if the value at the cursor path is None.
For example, if updated_at is None then it could set updated_at = created_at

dlt.sources.incremental(
    cursor_path="created_at",
    default_cursor_value=lambda row: row["created_at"] if row.get("updated_at", None) is None else row["updated_at"]
)

@rudolfix
Copy link
Collaborator

rudolfix commented Jul 10, 2024

@willi-mueller OK so it seems we need another setting in the incremental:
on_none_cursor_value: raise, include, exclude

so it is user's conscious decision what to do with such values. btw. this is possible even now by defining own max (last_value_func) that accepts nones.

also should work on object and arrow tables
WDYT?

@rudolfix
Copy link
Collaborator

@willi-mueller idea of the callable is also good but I'm not sure if add_map is not enough?
https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data

it does exactly what the user wants

@willi-mueller
Copy link
Collaborator Author

@willi-mueller OK so it seems we need another setting in the incremental: on_none_cursor_value: raise, include, exclude

so it is user's conscious decision what to do with such values. btw. this is possible even now by defining own max (last_value_func) that accepts nones.

also should work on object and arrow tables WDYT?

from what I observed, it crashes in find_cursor_value() which is executed even before the last_value_func(), such as max().

This, a custom last_value_func does not seem to solve this problem.

@willi-mueller
Copy link
Collaborator Author

@willi-mueller idea of the callable is also good but I'm not sure if add_map is not enough? https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data

it does exactly what the user wants

Wonderful! I forgot about the add_map! I'll test when these are executed – before the incremental call (where it crashes currently) or after. If before, then this would be the perfect solution! Thank you!

@willi-mueller
Copy link
Collaborator Author

@rudolfix I am afraid that neither the add_map() nor a custom last_value_func solve this issue because both are executed only after the incremental.__call__() calls the find_cursor_value() where the exception is raised.

I am afraid that we need to touch the incremental object.

I like the idea of the parameter on_none_cursor_value={"raise", "include", "exclude"}.

I am not sure if the add_map() could eliminate the need to pass in a callable that sets default values because the callable would be executed before the last_value_func whereas the transformer is executed afterward.

@willi-mueller
Copy link
Collaborator Author

willi-mueller commented Jul 11, 2024

Another problem with the add_map() is that it is not supported yet for arrow-table and arrow-batch.

dlt.extract.exceptions.ResourceExtractionError: In processing pipe some_data: extraction of resource some_data in transform MapItem caused an exception: 'pyarrow.lib.Table' object has no attribute 'get'

dlt.extract.exceptions.ResourceExtractionError: In processing pipe some_data: extraction of resource some_data in transform MapItem caused an exception: 'pyarrow.lib.RecordBatch' object has no attribute 'get'

@willi-mueller willi-mueller changed the title Pipeline crashes when single record has no incremental cursor path Pipeline crashes when single record has None value at incremental cursor path Jul 12, 2024
@willi-mueller
Copy link
Collaborator Author

willi-mueller commented Jul 25, 2024

@rudolfix @sh-rp Yes, you are right: add_map(my_func, insert_at=1) is sufficient to set default values because with insert_at=1 it is executed before the incremental.__call__(). Thank you for the pointer!

However, this issue is still open because of two reasons:

  1. if users want to ingest data as it is then we still need to adjust the incremental implementation. Importing data as it is in the source is an important feature especially for SQL sources because users might want to run the same queries they have on the source system also on the destination system. Also, dlt should be robust to handle wild data.
  2. The currently thrown exception IncrementalCursorPathMissing can be misleading because it is thrown when the path is present but the value at the path is None. Thus, we propose to use this exception only if the path is missing and the exception IncrementalCursorPathHasValueNone in case the cursor path exists but the value is None.

Please check out my implementation: #1576

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
2 participants