Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batched loading more convenient #2136

Closed
sh-rp opened this issue Dec 11, 2024 · 4 comments · Fixed by #2131
Closed

Make batched loading more convenient #2136

sh-rp opened this issue Dec 11, 2024 · 4 comments · Fixed by #2131

Comments

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 11, 2024

A common use case seems to be people loading large sources (such as big sql_databases) and wanting to batch the runs into multiple smaller runs. We should probably make this more convenient by:

  • Improving the limit functionality to include time limits.
  • Adding a very good example or a docs page that explains loading in batches for a combination of limit and incremental and multiple runs
  • Possibly having some kind of mechanism that stops repeated runs once no further rows are extracted.
@loveeklund-osttra
Copy link

I think this would be a great addition! I've experimented a bit with something like the script bellow. But I've found that it seems that the generator is "restarted" ( passed by value and not reference somewhere maybe?). So it outputs 0-99 twice instead of 0-199. I don't know how easy that would be to fix, but maybe a pattern like that could be used?
It would then be even nicer if one could call pipeline.stage() so you could stage files in a stagingdataset and then do a load, then you could in theory batch load full refresh tables as well, which would be nice for runtimes that has constrained storage :)

import dlt
from itertools import islice
from dlt.common import pendulum

@dlt.resource(name="table")
def read_table(limit):
    rows = iter(range(limit))
    print("starting the generator")
    while item_slice := list(islice(rows, 10)):
        now = pendulum.now().isoformat()
        yield [
            {"row": _id, "description": "this is row with id {_id}", "timestamp": now}
            for _id in item_slice
        ]

# this prevents process pool to run the initialization code again
def load():
    file_destination = dlt.destinations.filesystem(bucket_url="file:///usr/src/ingestion/pipeline_storage")

    pipeline = dlt.pipeline("test_limit", destination=file_destination)

    test_source = read_table(1000)
    test_source.add_limit(10)

    for _ in range(2):
        pipeline.extract(test_source)

        load_id = pipeline.list_extracted_load_packages()[0]
        extracted_package = pipeline.get_load_package_info(load_id)
        extracted_jobs = extracted_package.jobs["new_jobs"]
        print([str(job.job_file_info) for job in extracted_jobs])
        print(pipeline.normalize(loader_file_format="jsonl"))
        load_id = pipeline.list_normalized_load_packages()[0]
        print(pipeline.get_load_package_info(load_id))
        print(pipeline.load())

@sh-rp sh-rp linked a pull request Dec 16, 2024 that will close this issue
@sh-rp
Copy link
Collaborator Author

sh-rp commented Dec 16, 2024

@loveeklund-osttra on the next pipeline run your generator will be re-opened. for what you want to achieve you'll have to use an incremental and start the generator at the right point on the next run.

@sh-rp
Copy link
Collaborator Author

sh-rp commented Dec 16, 2024

I have linked a PR here which improves the add_limit and also adds a nice example for the sql_database.

@loveeklund-osttra
Copy link

Yeah that works for most usecases. I guess there are some edge cases where it wont work, like if you more records with the same incremental key than you have memory in your runtime ( could be a problem for small lambdas etc).
You might also end up splitting up transactions from your source system to the target.

I don't think these are huge issue and I appreciate your answers a lot. Just wanted to make you aware of some issues that might arise from this solution that I've thought about, maybe to be included in some documentation so people are aware.

@github-project-automation github-project-automation bot moved this from Todo to Done in dlt core library Dec 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants