Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
awaelchli committed Jul 19, 2024
1 parent 06bf414 commit bc64b77
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 37 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/ci-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ jobs:
pip install -e . -r requirements/test.txt -U -q --find-links $TORCH_URL
pip list
# FIXME: REMOVE
# - name: Setup tmate session
# uses: mxschmitt/action-tmate@v3

- name: Tests
run: coverage run --source litdata -m pytest tests -v

Expand Down
2 changes: 0 additions & 2 deletions src/litdata/processing/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,12 @@ def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_
output_filepath = remove_uuid_from_filename(output_filepath) # remove unique id from checkpoints

os.makedirs(os.path.dirname(output_filepath), exist_ok=True)
print(f"copying {local_filepath} to {output_filepath}")
shutil.copy(local_filepath, output_filepath)
else:
raise ValueError(f"The provided {output_dir.path} isn't supported.")

# Inform the remover to delete the file
if remove_queue and os.path.exists(local_filepath):
print(f"putting {local_filepath} on the remove queue")
remove_queue.put([local_filepath])


Expand Down
30 changes: 0 additions & 30 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import sys
import threading
from types import ModuleType
from unittest.mock import Mock

import pytest
import torch.distributed
from litdata.streaming.reader import PrepareChunksThread


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -67,31 +65,3 @@ def lightning_sdk_mock(monkeypatch):
lightning_sdk = ModuleType("lightning_sdk")
monkeypatch.setitem(sys.modules, "lightning_sdk", lightning_sdk)
return lightning_sdk


@pytest.fixture(autouse=True)
def _thread_police():
"""Attempts to stop left-over threads to avoid test interactions.
Adapted from PyTorch Lightning.
"""
active_threads_before = set(threading.enumerate())
yield
active_threads_after = set(threading.enumerate())

for thread in active_threads_after - active_threads_before:
if isinstance(thread, PrepareChunksThread):
thread.force_stop()
continue

stop = getattr(thread, "stop", None) or getattr(thread, "exit", None)
if thread.daemon and callable(stop):
# A daemon thread would anyway be stopped at the end of a program
# We do it preemptively here to reduce the risk of interactions with other tests that run after
stop()
assert not thread.is_alive()
elif thread.name == "QueueFeederThread":
thread.join(timeout=20)
else:
raise AssertionError(f"Test left zombie thread: {thread}")
2 changes: 1 addition & 1 deletion tests/streaming/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):
inputs=list(range(8)),
output_dir=data_dir,
chunk_size=190,
num_workers=1,
num_workers=1, # TODO: Want 4 here, but optimize() has deletion race condition
num_uploaders=1,
)
sleep(5) # wait for copier/remover threads to complete
Expand Down

0 comments on commit bc64b77

Please sign in to comment.