Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Chunks deletion issue #375

Merged
merged 11 commits into from
Sep 26, 2024
24 changes: 19 additions & 5 deletions src/litdata/processing/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ def run(self) -> None:
try:
self._setup()
self._loop()
self._terminate()
except Exception:
traceback_format = traceback.format_exc()
self.error_queue.put(traceback_format)
Expand All @@ -469,6 +470,19 @@ def _setup(self) -> None:
self._start_uploaders()
self._start_remover()

def _terminate(self) -> None:
"""Make sure all the uploaders, downloaders and removers are terminated."""
for uploader in self.uploaders:
if uploader.is_alive():
uploader.join()

for downloader in self.downloaders:
if downloader.is_alive():
downloader.join()

if self.remover and self.remover.is_alive():
self.remover.join()

def _loop(self) -> None:
num_downloader_finished = 0

Expand Down Expand Up @@ -1111,6 +1125,10 @@ def run(self, data_recipe: DataRecipe) -> None:

current_total = new_total
if current_total == num_items:
# make sure all processes are terminated
for w in self.workers:
if w.is_alive():
w.join()
break

if _IS_IN_STUDIO and node_rank == 0 and _ENABLE_STATUS:
Expand All @@ -1119,17 +1137,13 @@ def run(self, data_recipe: DataRecipe) -> None:

# Exit early if all the workers are done.
# This means there were some kinda of errors.
# TODO: Check whether this is still required.
deependujha marked this conversation as resolved.
Show resolved Hide resolved
if all(not w.is_alive() for w in self.workers):
raise RuntimeError("One of the worker has failed")

if _TQDM_AVAILABLE:
pbar.close()

# TODO: Check whether this is still required.
if num_nodes == 1:
for w in self.workers:
w.join()

print("Workers are finished.")
result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir)

Expand Down
Loading