-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for continuously starting load jobs as slots free up in the loader #1494
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
dlt/load/load.py
Outdated
if ( | ||
len(self.load_storage.list_new_jobs(load_id)) == 0 | ||
and len(self.load_storage.normalized_packages.list_started_jobs(load_id)) == 0 | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the correct "package completion" condition? I think so, but am not 100% sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was the previous condition:
if file_count == 0:
logger.info(f"No new jobs found in {load_id}")
return 0, []
so checking just new jobs. I do not fully get why you do it here again? the loop above should exit only when all jobs are completed, no?
tests/load/test_dummy_client.py
Outdated
@@ -96,15 +96,15 @@ def test_unsupported_write_disposition() -> None: | |||
load.load_storage.normalized_packages.save_schema(load_id, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to change a bunch of tests, since we do not rely on multiple executions of the run method anymore. All the changes make sense, it might be good to add a few more tests cases specific to the new implementation.
dlt/load/load.py
Outdated
remaining_jobs: List[LoadJob] = [] | ||
# if an exception condition was met, return it to the main runner | ||
pending_exception: Exception = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to collect the exception to raise in the main loop here now, we could alternatively collect all problems we find and print them out, not just raise on exception.
# Conflicts: # dlt/load/load.py # tests/load/test_dummy_client.py
c265ecc
to
b4d05c8
Compare
0a9b5c3
to
da8c9e6
Compare
fix some tests
@rudolfix this can go into another review. Two open questions from my side:
|
# Conflicts: # dlt/destinations/impl/filesystem/filesystem.py
835a49d
to
bd252f0
Compare
bd252f0
to
1c73de1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is the desired behavior from a conceptual standpoint when creating a followup job fails? In the old version, loading just continues and those jobs are marked as failed. I don't think this makes sense, because the load will be useless if for example a mergejob can't be created and executed. So either we decide
this is a transient problem (e.g. the user can manually fix the schema and restart he load) , in that case we just raise an exception on the main thread, so that the job that triggered the scheduling of a followupjob remains in "started_jobs" and will be rerun on the pipeline execution including the scheduling of the followupjob (it is implemented and tested like this now)
or this is a terminal problem, in which case we should also stop the load but mark the loadpackage as failed
my take:
make it a transient error
and we need how we deal with failed packages. right now we continue load and do not raise exception at the end. IMO we should change that. we should continue load but raise exception at the end automatically.
I'll write a ticket for that - it ia a breaking change
) and job_client.should_load_data_to_staging_dataset(load_table) | ||
|
||
# set job vars | ||
job.set_run_vars(load_id=load_id, schema=schema, load_table=load_table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to change tagging stuff:
- we tag session in:
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs"""
self._set_query_tags_for_job(load_id, table)
which is called from main thread and not supposed to open any connection. IDK how it works now :) but even if it does, it will tag a session on main thread and then we immediately close the connection and the reopen it on the worker thread but in that case tagging does not happen.
- Since you passed all required params in set_vars, we do not need to take any parameters in here:
def _set_query_tags_for_job(self, load_id: str, table: TTableSchema) -> None:
- this function should be called in
run
method of all jobs that have sql_client (created by sql_job_client). so we need to move it from client to the job... which is a good move
dlt/load/load.py
Outdated
# this will raise on signal | ||
sleep(1) | ||
sleep( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the above. are we still reading any job listings when looping on idle?
tests/cli/test_pipeline_command.py
Outdated
venv = Venv.restore_current() | ||
with pytest.raises(CalledProcessError) as cpe: | ||
print(venv.run_script("chess_pipeline.py")) | ||
|
||
# move job into running folder manually |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to change how is_package_partially_loaded
works. partially loaded -> has packages that are not completed and has packages that are completed.
current implementation assumes that failed packages, started packages and retried packages modify the destination. this is IMO wrong (we assume that jobs are atomic - most of them are). Now I think that was wrong from the start
WDYT?
) | ||
|
||
# job will be automatically found and resumed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! I probably overlooked the lines below!
tests/load/test_dummy_client.py
Outdated
# sanity check | ||
assert duration > 5 | ||
|
||
# we want 1000 empty processed jobs to need less than 15 seconds total (locally it runs in 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. but this idle loop where we sleep 0.1 seconds worries. do we have 100% cpu usage when this test run? maybe it is a little bit faster but we saturate CPU (while having some threads working). pls take a look. with 50k or 100k jobs maybe.
we must avoid starving threads by an idle loop that reads 50k files over and over
tests/pipeline/test_pipeline.py
Outdated
with pytest.raises(PipelineStepFailed): | ||
pipeline.run(airtable_emojis()) | ||
# move job into running folder manually |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@property
def has_pending_data(self) -> bool:
"""Tells if the pipeline contains any extracted files or pending load packages"""
return (
len(self.list_normalized_load_packages()) > 0
or len(self.list_extracted_load_packages()) > 0
)
it does not even look into package content. any not completed package is pending and will be executed before new package is created. this check is main reason this function exist
I answered is_package_partially_loaded
question above
Ok, I changed it slightly and added new exceptions to indicate what went wrong plus tests. With regards to the load package with failing jobs: I totally agree that that should raise at the end of the load if there were failed jobs. Now these errors are pretty much hidden. |
d1b2144
to
ce3e1c9
Compare
@@ -723,19 +723,12 @@ def build_job_file_name( | |||
|
|||
@staticmethod | |||
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the behavior is unified now between the different package states, I'd say this is correct.
# Conflicts: # dlt/destinations/impl/filesystem/filesystem.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
In the current implementation we more or less always start n (=max workers) load jobs, let them complete and then rerun the whole loader to schedule the next n jobs. In this PR we submit new load jobs as slots free up.
What is happening here:
Possible FollowupWork: