Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filesystem Source incremental loading with S3 not working correctly #2124

Closed
FlorianASchroeder opened this issue Dec 5, 2024 · 1 comment · Fixed by #2131
Closed

Filesystem Source incremental loading with S3 not working correctly #2124

FlorianASchroeder opened this issue Dec 5, 2024 · 1 comment · Fixed by #2131
Assignees

Comments

@FlorianASchroeder
Copy link

dlt version

1.4.0

Describe the problem

I want to load files incrementally from S3 to a destination.
Because this process should be robust against failures and memory efficient, I followed a suggestion from the slack channel to use add_limit() to perform chunked uploads to the destination similar to this:

@dlt.transformer
def print_timestamp(items: Iterator[FileItemDict]):
  # use for debugging
  for file in items:
    print(f"modified at: {file['modification_date']}")
    yield file

all_files = filesystem(
  bucket_url=BUCKET_URL, file_glob=f"{prefix}/*/*", files_per_page=2
)
all_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))

current_chunk = all_files.add_limit(1) | print_timestamp

pipeline.run(current_chunk)

The files are always returned in a fixed order by filesource like this:

list(all_files | print_timestamp )
>modified at: 2024-12-04 15:26:23+00:00
>modified at: 2024-12-04 15:25:33+00:00
>modified at: 2024-12-04 15:26:20+00:00
>modified at: 2024-12-04 15:26:10+00:00
>modified at: 2024-12-04 15:26:41+00:00
>modified at: 2024-12-04 15:26:45+00:00
>modified at: 2024-12-04 15:25:56+00:00
>modified at: 2024-12-04 15:26:39+00:00
>modified at: 2024-12-04 15:26:33+00:00
>modified at: 2024-12-04 15:26:04+00:00
>modified at: 2024-12-04 15:26:36+00:00
>modified at: 2024-12-04 15:26:08+00:00
>modified at: 2024-12-04 15:25:41+00:00

Executing the pipeline twice yields the following:

/workspaces/src (main) $ python -m test_pipe
modified at: 2024-12-04 15:26:23+00:00
modified at: 2024-12-04 15:25:33+00:00
Pipeline test_pipe load step completed in 50.67 seconds
1 load package(s) were loaded to destination athena and into dataset test_dataset
The filesystem staging destination used s3://xxx location to stage data
The athena destination used s3://xxx on awsdatacatalog location to store data
Load package 1733406935.8446503 is LOADED and contains no failed jobs
/workspaces/src (main) $ python -m test_pipe
Pipeline test_pipe load step completed in ---
0 load package(s) were loaded to destination athena and into dataset None
The filesystem staging destination used s3://xxx location to stage data
The athena destination used s3://xxx on awsdatacatalog location to store data

So the repeated execution of filesystem source with incremental loading hints seems to not appropriately filter old files and process newer files instead.
When removing add_limit(1) after the first pipeline run, then any files after the largest incremental timestamp are loaded:

/workspaces/src (main) $ python -m test_pipe
modified at: 2024-12-04 15:26:41+00:00
modified at: 2024-12-04 15:26:45+00:00
modified at: 2024-12-04 15:26:39+00:00
modified at: 2024-12-04 15:26:33+00:00
modified at: 2024-12-04 15:26:36+00:00
modified at: 2024-12-04 15:26:32+00:00
modified at: 2024-12-04 15:26:28+00:00
modified at: 2024-12-04 15:26:37+00:00
modified at: 2024-12-04 15:26:26+00:00
modified at: 2024-12-04 15:26:49+00:00
modified at: 2024-12-04 15:26:45+00:00
modified at: 2024-12-04 15:26:28+00:00
modified at: 2024-12-04 15:26:54+00:00
modified at: 2024-12-04 15:26:35+00:00
Pipeline test_pipe load step completed in 26.54 seconds
1 load package(s) were loaded to destination athena and into dataset test_dataset
The filesystem staging destination used s3://xxx location to stage data
The athena destination used s3://xxx on awsdatacatalog location to store data
Load package 1733407191.6829283 is LOADED and contains no failed jobs

Yet, this reveals another problem: The files with modification times before 2024-12-04 15:26:23 did not get loaded, because the filesource did not return the files sorted ascending by modification time and stored this as latest timestamp after the first batched pipeline run.

Expected behavior

  • The filesystem source should return the file items in a well-defined order if an incremental loading strategy is defined.
  • The order shall be well-defined across all pages in the filesystem glob
  • the incremental filter should be applied before add_limit is evaluated, so that a chunked pipeline.run can be realised. Otherwise pipeline.run should support chunking internally to control the memory usage during pipeline execution

Steps to reproduce

See the report above

Operating system

Linux

Runtime environment

Virtual Machine

Python version

3.11

dlt data source

filesystem

dlt destination

AWS Athena / Glue Catalog

Other deployment details

No response

Additional information

No response

@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Dec 9, 2024
@rudolfix
Copy link
Collaborator

@FlorianASchroeder thanks for this report. the core of the problem is add_limit implementation which was meant only to allow to sample large sources during development. we are working to make it useful for backfilling like in #1485

There are two things that we must do to fix your issues:

  1. convert add_limit into regular extract pipe step that is executed at the end. that will IMO fix all your issues - incremental works with data that is not ordered as well. you'll just reply all the file descriptors to filter new files.
  2. we can follow sql_database which translates settings in incremental into an SQL query. here we could use row_order to order results before we send them downstream

AFAIK fsspec reads all the files before emitting pages. I need to take a look. for sure there's no way to order records when globbing. so (2) may not be implemented in an efficient manner. (we'll need to read full list, order by attribute and then emit it page by page)

@rudolfix rudolfix self-assigned this Dec 10, 2024
@sh-rp sh-rp linked a pull request Dec 10, 2024 that will close this issue
@github-project-automation github-project-automation bot moved this from In Progress to Done in dlt core library Dec 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants