Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append data to pre-optimized dataset #23

Closed
tchaton opened this issue Feb 26, 2024 · 5 comments · Fixed by #184
Closed

Append data to pre-optimized dataset #23

tchaton opened this issue Feb 26, 2024 · 5 comments · Fixed by #184
Labels
enhancement New feature or request

Comments

@tchaton
Copy link
Collaborator

tchaton commented Feb 26, 2024

Description & Motivation

It's not uncommon to have to update the data one is training on or computing embeddings on etc. We could support appending data to an optimized (chunked) dataset.
Appending alone is sufficient, removal is more specific and can be performed by rerunning through the samples with a map and creating a new dataset from it.

Pitch

When we map or optimize specifying a target location, add ability to append to existing chunks (through a mode="append" argument or something along these lines).

Alternatives

The alternative is creating a new dataset and compose with the previous one during iteration. However composition is not trivial because we need to make sure we are drawing each sample once from each and then avoid bumping into StopIteration. We would need to add a specific mode to composed dataset.

If the data added is misaligned with the chunk size and appending happens often it would create a suboptimal dataset after a while, that would need to be compacted into a single one by iterating sequentially. This could be a further alternative: a utility where you pass a list of datasets and create a single dataset by iterating through all of them.

Additional context

No response

cc @Borda @tchaton

Moved from Lightning-AI/pytorch-lightning#19519, submitted by @lantiga

Copy link

Hi! thanks for your contribution!, great first issue!

@Borda Borda removed the help wanted Extra attention is needed label Apr 18, 2024
@isidentical
Copy link

this is essentially one of the main blockers that'd switch us from switching to litdata, as for training cascaded models we need to constantly amend an existing set of data (consider [img, caption] pairs, but then we introduce 256x256 latents, then 512x512 latents etc))

@tchaton
Copy link
Collaborator Author

tchaton commented May 31, 2024

Hey @isidentical. Would you be interested to contribute the feature. It shouldn't be too hard to do and I can help along the way.

A StreamingDataset is described by its index.json. The chunks are loaded from the file into a list there: main/src/litdata/streaming/config.py#L56
3:58

We could either have a utility to merge StreamingDatasets together or implement the append mechanism that takes an existing index.json and keep appending to it.

Finally, you can explore the CombinedStreamingDataset to be able to combine multiple datasets together right now.

from litdata import StreamingDataset, CombinedStreamingDataset
from litdata.streaming.item_loader import TokensLoader
from tqdm import tqdm
import os
from torch.utils.data import DataLoader

train_datasets = [
    StreamingDataset(
        input_dir="s3://tinyllama-template/slimpajama/train/",
        item_loader=TokensLoader(block_size=2048 + 1), # Optimized loader for tokens used by LLMs 
        shuffle=True,
        drop_last=True,
    ),
    StreamingDataset(
        input_dir="s3://tinyllama-template/starcoder/",
        item_loader=TokensLoader(block_size=2048 + 1), # Optimized loader for tokens used by LLMs 
        shuffle=True,
        drop_last=True,
    ),
]

# Mix SlimPajama data and Starcoder data with these proportions:
weights = (0.693584, 0.306416)
combined_dataset = CombinedStreamingDataset(datasets=train_datasets, seed=42, weights=weights)

train_dataloader = DataLoader(combined_dataset, batch_size=8, pin_memory=True, num_workers=os.cpu_count())

# Iterate over the combined datasets
for batch in tqdm(train_dataloader):
    pass

However, while training cascaded model with increasing size, I would recommend to have several datasets, one for each image dimension and switch them based on the epoch. This would enable training to be as fast as possible without the loss of control.

Finally, I recommend having a look at this published Studio: https://lightning.ai/lightning-ai/studios/download-stream-400m-images-text?section=data+processing

@deependujha
Copy link
Collaborator

deependujha commented Jun 21, 2024

Hi, I'm interested in working on this feature. But, before that, I must ensure I've understood it correctly.

The current behavior for optimize is:

    optimize(
        fn=random_images,
        inputs=list(range(1000)),
        output_dir="my_dataset", 
        num_workers=4,
        chunk_bytes="64MB" 
    )

    ds = StreamingDataset("my_dataset")
    assert len(ds) == 1000

   # again calling `optimize` will **overwrite** existing optimized dataset
    optimize(
        fn=random_images,
        inputs=list(range(10)),
        output_dir="my_dataset",
        num_workers=4,
        chunk_bytes="64MB",
    )

    ds = StreamingDataset("my_dataset") 
    assert len(ds) == 10

So, we have to add a new argument to optimize called mode='append' or mode='overwrite' that will do the expected.


But, if this is what we want, we can create multiple optimized datasets and then use CombinedStreamingDataset:

    optimize(
        fn=random_images,
        inputs=list(range(1000)),
        output_dir="my_dataset-1", 
        num_workers=4,
        chunk_bytes="64MB" 
    )

    optimize(
        fn=random_images,
        inputs=list(range(10)),
        output_dir="my_dataset-2", 
        num_workers=4,
        chunk_bytes="64MB" 
    )

    train_datasets = [
        StreamingDataset(
            input_dir="my_dataset-1",
        ),
        StreamingDataset(
            input_dir="my_dataset-2",
        ),
    ]
    combined_dataset = CombinedStreamingDataset(datasets=train_datasets, seed=42)

    assert len(combined_dataset) == 1000 + 10 

So, I think this already is possible. But anyways, what exactly do we want?

  1. mode='append | overwrite' argument in optimize function.
  2. or, Just like we have CombinedStreamingDataset, we should have a CombinedOptimizeDataset.

Or something else.

And, a silly question: any real-world example when we want to update/ modify data in each epoch when training a model? ChatGPT gave the example of Online learning.

@tchaton
Copy link
Collaborator Author

tchaton commented Jun 21, 2024

Hey @deependujha We want 1. The CombinedStreamingDataset has many drawbacks where it creates multiple threads, doesn't have good synchronisation points, etc..
Things are always faster and more native when using a single dataset source.

So the mode are None | append | overwrite where None is the default. If you run this code and there is already an index.json, it should break and tell you there is already an optimized dataset at this location and datasets are considered as immutable when created. However, the user can user to opt-in on overwrite (delete all and restart) or append (download the index.json and start creating chunks with namings from the last included in the index.json).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
4 participants