From 92baca684686d779155ebe41bdfeafeb2fd481da Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 09:43:57 +0530 Subject: [PATCH 1/6] readme updated and a few nitpicks --- README.md | 42 +++++++++++++++++++++++++++++++ src/litdata/streaming/combined.py | 2 +- tests/streaming/test_dataset.py | 2 +- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 003eac48..78e91a5e 100644 --- a/README.md +++ b/README.md @@ -520,6 +520,48 @@ from litdata import StreamingDataset dataset = StreamingDataset(input_dir="local:/data/shared-drive/some-data") ``` + + +
+ ✅ Optimize dataset in distributed environment +  + +Lightning can distribute large workloads across hundreds of machines in parallel. This can reduce the time to complete a data processing task from weeks to minutes by scaling to enough machines. + +To apply the optimize operator across multiple machines, simply provide the num_nodes and machine arguments to it as follows: + +```python +import os +from litdata import optimize, Machine + +def compress(index): + return (index, index ** 2) + +optimize( + fn=compress, + inputs=list(range(100)), + num_workers=2, + output_dir="my_output", + chunk_bytes="64MB", + num_nodes=2, + machine=Machine.DATA_PREP, # You can select between dozens of optimized machines +) +``` + +If the `output_dir` is a local path, the optimized dataset will be present in: `/teamspace/jobs/{job_name}/nodes-0/my_output`. Otherwise, it will be stored in the specified `output_dir`. + +Read the optimized dataset: + +```python +from litdata import StreamingDataset + +output_dir = "/teamspace/jobs/litdata-optimize-2024-07-08/nodes.0/my_output" + +dataset = StreamingDataset(output_dir) + +print(dataset[:]) +``` +
  diff --git a/src/litdata/streaming/combined.py b/src/litdata/streaming/combined.py index 69b1f9b1..4741b75d 100644 --- a/src/litdata/streaming/combined.py +++ b/src/litdata/streaming/combined.py @@ -139,7 +139,7 @@ def __iter__(self) -> Iterator[Any]: num_samples_yielded = None if self._num_samples_yielded is not None and worker_env.rank in self._num_samples_yielded: - num_samples_yielded = self._num_samples_yielded[worker_env.rank] + num_samples_yielded = self._num_samples_yielded.get(worker_env.rank, 0) self._iterator = _CombinedDatasetIterator( self._datasets, diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index 88176ec2..da78b02c 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -807,7 +807,7 @@ def _get_simulated_s3_dataloader(cache_dir, data_dir): return StreamingDataLoader(dataset, batch_size=2, num_workers=1) -@pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows and MacOs") +@pytest.mark.skipif(sys.platform == "win32" or sys.platform == "darwin", reason="Not tested on windows and MacOs") @mock.patch.dict(os.environ, {}, clear=True) def test_dataset_resume_on_future_chunks(tmpdir, monkeypatch): """This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have From 0d1cd00d1ced4e748818faf1d86898f0dda09833 Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 10:11:10 +0530 Subject: [PATCH 2/6] updated timeout time --- tests/streaming/test_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index da78b02c..06364da9 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -56,7 +56,7 @@ def seed_everything(random_seed): pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), ], ) -@pytest.mark.timeout(15) +@pytest.mark.timeout(30) def test_streaming_dataset(tmpdir, monkeypatch, compression): seed_everything(42) @@ -198,7 +198,7 @@ def test_streaming_dataset_distributed_no_shuffle(drop_last, tmpdir, compression pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), ], ) -@pytest.mark.timeout(30) +@pytest.mark.timeout(60) def test_streaming_dataset_distributed_full_shuffle_odd(drop_last, tmpdir, compression): seed_everything(42) From 34bcc6fa282b9efef8279beb95fb34335debf860 Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 11:03:44 +0530 Subject: [PATCH 3/6] reset timeout time --- tests/streaming/test_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index 06364da9..da78b02c 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -56,7 +56,7 @@ def seed_everything(random_seed): pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), ], ) -@pytest.mark.timeout(30) +@pytest.mark.timeout(15) def test_streaming_dataset(tmpdir, monkeypatch, compression): seed_everything(42) @@ -198,7 +198,7 @@ def test_streaming_dataset_distributed_no_shuffle(drop_last, tmpdir, compression pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), ], ) -@pytest.mark.timeout(60) +@pytest.mark.timeout(30) def test_streaming_dataset_distributed_full_shuffle_odd(drop_last, tmpdir, compression): seed_everything(42) From bbcaaf451a8c526ef72b7734e7bed889109d151a Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 13:12:15 +0530 Subject: [PATCH 4/6] fix failing tests --- tests/streaming/test_dataset.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index da78b02c..b1615198 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -830,6 +830,7 @@ def test_dataset_resume_on_future_chunks(tmpdir, monkeypatch): train_dataloader = _get_simulated_s3_dataloader(s3_cache_dir, data_dir) batches_to_fetch = 16 batch_to_resume_from = None + dataloader_state = None for i, batch in enumerate(train_dataloader): if i == batches_to_fetch: dataloader_state = train_dataloader.state_dict() @@ -840,6 +841,8 @@ def test_dataset_resume_on_future_chunks(tmpdir, monkeypatch): shutil.rmtree(s3_cache_dir) os.mkdir(s3_cache_dir) train_dataloader = _get_simulated_s3_dataloader(s3_cache_dir, data_dir) + assert dataloader_state is not None + assert batch_to_resume_from is not None train_dataloader.load_state_dict(dataloader_state) # The next batch after resuming must match what we should have gotten next in the initial loop assert torch.equal(next(iter(train_dataloader)), batch_to_resume_from) From b1734680c5352c15b3fcbcb8cf15339be347df48 Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 14:43:17 +0530 Subject: [PATCH 5/6] try tweaking future_chunk tests for passing in ci --- tests/streaming/test_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index b1615198..03fa688a 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -809,6 +809,7 @@ def _get_simulated_s3_dataloader(cache_dir, data_dir): @pytest.mark.skipif(sys.platform == "win32" or sys.platform == "darwin", reason="Not tested on windows and MacOs") @mock.patch.dict(os.environ, {}, clear=True) +@pytest.mark.timeout(60) def test_dataset_resume_on_future_chunks(tmpdir, monkeypatch): """This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have the same size.""" @@ -819,7 +820,7 @@ def test_dataset_resume_on_future_chunks(tmpdir, monkeypatch): optimize( fn=_simple_preprocess, - inputs=list(range(8)), + inputs=list(range(5)), output_dir=str(tmpdir / "optimized"), chunk_size=190, num_workers=4, From 8087f55f2825455c16d7bd9f93c41206915e48b9 Mon Sep 17 00:00:00 2001 From: deependujha Date: Fri, 12 Jul 2024 15:04:05 +0530 Subject: [PATCH 6/6] update --- tests/streaming/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/streaming/test_dataset.py b/tests/streaming/test_dataset.py index 03fa688a..92d1068b 100644 --- a/tests/streaming/test_dataset.py +++ b/tests/streaming/test_dataset.py @@ -198,7 +198,7 @@ def test_streaming_dataset_distributed_no_shuffle(drop_last, tmpdir, compression pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), ], ) -@pytest.mark.timeout(30) +@pytest.mark.timeout(60) def test_streaming_dataset_distributed_full_shuffle_odd(drop_last, tmpdir, compression): seed_everything(42)