From 3ca9e67ecca117f7d482dae5a68e0cb09410f11b Mon Sep 17 00:00:00 2001 From: deependu Date: Tue, 17 Sep 2024 20:04:57 +0530 Subject: [PATCH 1/6] not sure if it works. hehe --- src/litdata/processing/data_processor.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 1a14fa0b..4f77ebad 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -456,6 +456,7 @@ def run(self) -> None: try: self._setup() self._loop() + self._terminate() except Exception: traceback_format = traceback.format_exc() self.error_queue.put(traceback_format) @@ -469,6 +470,19 @@ def _setup(self) -> None: self._start_uploaders() self._start_remover() + def _terminate(self) -> None: + """make sure all the uploaders, downloaders and removers are terminated""" + for uploader in self.uploaders: + if uploader.is_alive(): + uploader.join() + + for downloader in self.downloaders: + if downloader.is_alive(): + downloader.join() + + if self.remover and self.remover.is_alive(): + self.remover.join() + def _loop(self) -> None: num_downloader_finished = 0 @@ -1111,6 +1125,10 @@ def run(self, data_recipe: DataRecipe) -> None: current_total = new_total if current_total == num_items: + # make sure all processes are terminated + for w in self.workers: + if w.is_alive(): + w.join() break if _IS_IN_STUDIO and node_rank == 0 and _ENABLE_STATUS: @@ -1119,17 +1137,13 @@ def run(self, data_recipe: DataRecipe) -> None: # Exit early if all the workers are done. # This means there were some kinda of errors. + # TODO: Check whether this is still required. if all(not w.is_alive() for w in self.workers): raise RuntimeError("One of the worker has failed") if _TQDM_AVAILABLE: pbar.close() - # TODO: Check whether this is still required. - if num_nodes == 1: - for w in self.workers: - w.join() - print("Workers are finished.") result = data_recipe._done(len(user_items), self.delete_cached_files, self.output_dir) From aaeb3ae3cd24b877aaa363e8a285699df96d3a5a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Sep 2024 14:37:26 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/processing/data_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 4f77ebad..346a2864 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -471,7 +471,7 @@ def _setup(self) -> None: self._start_remover() def _terminate(self) -> None: - """make sure all the uploaders, downloaders and removers are terminated""" + """Make sure all the uploaders, downloaders and removers are terminated.""" for uploader in self.uploaders: if uploader.is_alive(): uploader.join() From 52a3098637b5aaadcf384bdf76deb094bd73eb94 Mon Sep 17 00:00:00 2001 From: deependu Date: Wed, 18 Sep 2024 01:37:54 +0530 Subject: [PATCH 3/6] added test to check behaviour in CI --- src/litdata/processing/data_processor.py | 2 +- tests/processing/test_functions.py | 60 +++++++++++++++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 198c1174..a246c68b 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -809,7 +809,7 @@ def _done(self, size: int, delete_cached_files: bool, output_dir: Dir) -> _Resul chunks = [file for file in os.listdir(cache_dir) if file.endswith(".bin")] if chunks and delete_cached_files and output_dir.path is not None: - raise RuntimeError(f"All the chunks should have been deleted. Found {chunks}") + raise RuntimeError(f"All the chunks should have been deleted. Found {chunks} in cache: {cache_dir}") merge_cache = Cache(cache_dir, chunk_bytes=1) node_rank = _get_node_rank() diff --git a/tests/processing/test_functions.py b/tests/processing/test_functions.py index 80eec0ba..74f791f8 100644 --- a/tests/processing/test_functions.py +++ b/tests/processing/test_functions.py @@ -1,15 +1,22 @@ + +import glob import os +import random +import shutil import sys +from pathlib import Path from unittest import mock import cryptography import numpy as np import pytest +import requests +from PIL import Image + from litdata import StreamingDataset, merge_datasets, optimize, walk from litdata.processing.functions import _get_input_dir, _resolve_dir from litdata.streaming.cache import Cache from litdata.utilities.encryption import FernetEncryption, RSAEncryption -from PIL import Image @pytest.mark.skipif(sys.platform == "win32", reason="currently not supported for windows.") @@ -475,3 +482,54 @@ def test_optimize_with_rsa_encryption(tmpdir): # encryption=rsa, # mode="overwrite", # ) + + + + +def tokenize(filename: str): + with open(filename, "r", encoding="utf-8") as file: + text = file.read() + text = text.strip().split(" ") + word_to_int = {word: random.randint(1, 1000) for word in set(text)} + tokenized = [word_to_int[word] for word in text] + + yield tokenized +@pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows") +def test_optimize_race_condition(tmpdir): + # issue: https://github.com/Lightning-AI/litdata/issues/367 + # run_commands = [ + # "mkdir -p tempdir/custom_texts", + # "curl https://www.gutenberg.org/cache/epub/24440/pg24440.txt --output tempdir/custom_texts/book1.txt", + # "curl https://www.gutenberg.org/cache/epub/26393/pg26393.txt --output tempdir/custom_texts/book2.txt", + # ] + shutil.rmtree(f"{tmpdir}/custom_texts", ignore_errors=True) + os.makedirs(f"{tmpdir}/custom_texts", exist_ok=True) + + urls = [ + "https://www.gutenberg.org/cache/epub/24440/pg24440.txt", + "https://www.gutenberg.org/cache/epub/26393/pg26393.txt", + ] + + for i, url in enumerate(urls): + print(f"downloading {i+1} file") + with requests.get(url, stream=True, timeout=10) as r: + r.raise_for_status() # Raise an exception for bad status codes + + with open(f"{tmpdir}/custom_texts/book{i+1}.txt", "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + + print("="*100) + + + train_files = sorted(glob.glob(str(Path(f"{tmpdir}/custom_texts") / "*.txt"))) + print("="*100) + print(train_files) + print("="*100) + optimize( + fn=tokenize, + inputs=train_files, + output_dir=f"{tmpdir}/temp", + num_workers=1, + chunk_bytes="50MB", + ) From c090636d43e606bcb6f79a90cb27690c8a844780 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Sep 2024 20:08:28 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/processing/test_functions.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/tests/processing/test_functions.py b/tests/processing/test_functions.py index 74f791f8..0be0a013 100644 --- a/tests/processing/test_functions.py +++ b/tests/processing/test_functions.py @@ -1,4 +1,3 @@ - import glob import os import random @@ -11,12 +10,11 @@ import numpy as np import pytest import requests -from PIL import Image - from litdata import StreamingDataset, merge_datasets, optimize, walk from litdata.processing.functions import _get_input_dir, _resolve_dir from litdata.streaming.cache import Cache from litdata.utilities.encryption import FernetEncryption, RSAEncryption +from PIL import Image @pytest.mark.skipif(sys.platform == "win32", reason="currently not supported for windows.") @@ -484,16 +482,16 @@ def test_optimize_with_rsa_encryption(tmpdir): # ) - - def tokenize(filename: str): - with open(filename, "r", encoding="utf-8") as file: + with open(filename, encoding="utf-8") as file: text = file.read() text = text.strip().split(" ") word_to_int = {word: random.randint(1, 1000) for word in set(text)} tokenized = [word_to_int[word] for word in text] yield tokenized + + @pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows") def test_optimize_race_condition(tmpdir): # issue: https://github.com/Lightning-AI/litdata/issues/367 @@ -519,13 +517,12 @@ def test_optimize_race_condition(tmpdir): for chunk in r.iter_content(chunk_size=8192): f.write(chunk) - print("="*100) - + print("=" * 100) train_files = sorted(glob.glob(str(Path(f"{tmpdir}/custom_texts") / "*.txt"))) - print("="*100) + print("=" * 100) print(train_files) - print("="*100) + print("=" * 100) optimize( fn=tokenize, inputs=train_files, From c645b009fd14d0a285a3af4d97a4fff1de5a3f7b Mon Sep 17 00:00:00 2001 From: deependu Date: Wed, 18 Sep 2024 01:40:47 +0530 Subject: [PATCH 5/6] update --- tests/processing/test_functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/processing/test_functions.py b/tests/processing/test_functions.py index 0be0a013..a897da53 100644 --- a/tests/processing/test_functions.py +++ b/tests/processing/test_functions.py @@ -486,7 +486,7 @@ def tokenize(filename: str): with open(filename, encoding="utf-8") as file: text = file.read() text = text.strip().split(" ") - word_to_int = {word: random.randint(1, 1000) for word in set(text)} + word_to_int = {word: random.randint(1, 1000) for word in set(text)} # noqa: S311 tokenized = [word_to_int[word] for word in text] yield tokenized From c752bc5f9e1882410af928402ae0b2174bc92b23 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Sep 2024 20:11:36 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/processing/test_functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/processing/test_functions.py b/tests/processing/test_functions.py index a897da53..42482b7e 100644 --- a/tests/processing/test_functions.py +++ b/tests/processing/test_functions.py @@ -486,7 +486,7 @@ def tokenize(filename: str): with open(filename, encoding="utf-8") as file: text = file.read() text = text.strip().split(" ") - word_to_int = {word: random.randint(1, 1000) for word in set(text)} # noqa: S311 + word_to_int = {word: random.randint(1, 1000) for word in set(text)} # noqa: S311 tokenized = [word_to_int[word] for word in text] yield tokenized