diff --git a/.github/workflows/ci-testing.yml b/.github/workflows/ci-testing.yml index 1acab9fb..080e70b5 100644 --- a/.github/workflows/ci-testing.yml +++ b/.github/workflows/ci-testing.yml @@ -78,10 +78,6 @@ jobs: pip install -e . -r requirements/test.txt -U -q --find-links $TORCH_URL pip list - # FIXME: REMOVE - # - name: Setup tmate session - # uses: mxschmitt/action-tmate@v3 - - name: Tests run: coverage run --source litdata -m pytest tests -v diff --git a/src/litdata/processing/data_processor.py b/src/litdata/processing/data_processor.py index 02bbca42..ba100794 100644 --- a/src/litdata/processing/data_processor.py +++ b/src/litdata/processing/data_processor.py @@ -255,14 +255,12 @@ def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_ output_filepath = remove_uuid_from_filename(output_filepath) # remove unique id from checkpoints os.makedirs(os.path.dirname(output_filepath), exist_ok=True) - print(f"copying {local_filepath} to {output_filepath}") shutil.copy(local_filepath, output_filepath) else: raise ValueError(f"The provided {output_dir.path} isn't supported.") # Inform the remover to delete the file if remove_queue and os.path.exists(local_filepath): - print(f"putting {local_filepath} on the remove queue") remove_queue.put([local_filepath]) diff --git a/tests/conftest.py b/tests/conftest.py index d12ff2f6..17e47ed8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,9 @@ import sys -import threading from types import ModuleType from unittest.mock import Mock import pytest import torch.distributed -from litdata.streaming.reader import PrepareChunksThread @pytest.fixture(autouse=True) @@ -67,31 +65,3 @@ def lightning_sdk_mock(monkeypatch): lightning_sdk = ModuleType("lightning_sdk") monkeypatch.setitem(sys.modules, "lightning_sdk", lightning_sdk) return lightning_sdk - - -@pytest.fixture(autouse=True) -def _thread_police(): - """Attempts to stop left-over threads to avoid test interactions. - - Adapted from PyTorch Lightning. - - """ - active_threads_before = set(threading.enumerate()) - yield - active_threads_after = set(threading.enumerate()) - - for thread in active_threads_after - active_threads_before: - if isinstance(thread, PrepareChunksThread): - thread.force_stop() - continue - - stop = getattr(thread, "stop", None) or getattr(thread, "exit", None) - if thread.daemon and callable(stop): - # A daemon thread would anyway be stopped at the end of a program - # We do it preemptively here to reduce the risk of interactions with other tests that run after - stop() - assert not thread.is_alive() - elif thread.name == "QueueFeederThread": - thread.join(timeout=20) - else: - raise AssertionError(f"Test left zombie thread: {thread}") diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index fba51389..2299b3eb 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -838,7 +838,7 @@ def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch): inputs=list(range(8)), output_dir=data_dir, chunk_size=190, - num_workers=1, + num_workers=1, # TODO: Want 4 here, but optimize() has deletion race condition num_uploaders=1, ) sleep(5) # wait for copier/remover threads to complete