Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalDataset bug #300

Closed
tungdq212 opened this issue Jun 14, 2023 · 6 comments
Closed

LocalDataset bug #300

tungdq212 opened this issue Jun 14, 2023 · 6 comments
Labels
bug Something isn't working

Comments

@tungdq212
Copy link

LocalDataset Bug

streaming.base.LocalDataset need override size function

class LocalDataset(Array, Dataset):
   """A streaming dataset whose shards reside locally as a pytorch Dataset.

   Args:
       local (str): Local dataset directory where shards are cached by split.
       split (str, optional): Which dataset split to use, if any. Defaults to ``None``.
   """

   def __init__(self, local: str, split: Optional[str] = None):
       split = split or ''

       self.local = local
       self.split = split

       filename = os.path.join(local, split, get_index_basename())  # pyright: ignore
       obj = json.load(open(filename))
       if obj['version'] != 2:
           raise ValueError('Unsupported version')

       self.shards = []
       for info in obj['shards']:
           shard = reader_from_json(local, split, info)
           self.shards.append(shard)
       self.num_samples = sum([shard.samples for shard in self.shards])

       shard_sizes = np.array([x.samples for x in self.shards])
       self.spanner = Spanner(shard_sizes)

   def __len__(self) -> int:
       """Get the length as an IterableDataset.

       Returns:
           int: Dataset length.
       """
       return self.num_samples
   
   @property
   def size(self) -> int:
       """Get the size of the dataset in samples.

       Returns:
           int: Number of samples.
       """
       return self.length

   def get_item(self, sample_id: int) -> Dict[str, Any]:
       """Get sample by global sample ID.

       Args:
           sample_id (int): Sample ID.

       Returns:
           Dict[str, Any]: Column name with sample data.
       """
       shard_id, index_in_shard = self.spanner[sample_id]
       shard = self.shards[shard_id]
       return shard[index_in_shard]
@tungdq212 tungdq212 added the bug Something isn't working label Jun 14, 2023
@karan6181
Copy link
Collaborator

Hi @tungdq212, thanks for identifying the issue in LocalDataset. Since you have already provided the fix above, do you mind create a PR for the same ? If don't have the bandwidth, that's totally understandable too!

@tungdq212
Copy link
Author

@karan6181 I created a PR. But why we need Array beside pytorch Dataset in LocalDataset?

@knighton
Copy link
Contributor

Thanks for the PR! You have some kind of linting error. You can do something like this:

apt install pre-commit
pre-commit install
pre-commit run --all-files

The Array base class just provides fancy numpy-style indexing (https://github.com/mosaicml/streaming/blob/main/streaming/base/array.py#L78)

@tungdq212
Copy link
Author

@karan6181 Hi, I have more questions:

  • Due to this comment in diffusion repo, how to use StreamingDataset in local?
  • In StreamingDataset, you use dist from pytorch, which is not used in LocalDataset. Why?
  • I start train with LocalDataset, but it raise CUDA out of memory after few epoch. As my observation, there is a leaking memory issue from dataloader after each eval process. Is that a problems?

@knighton
Copy link
Contributor

Due to mosaicml/diffusion#33 (comment) in diffusion repo, how to use StreamingDataset in local?

  • LocalDataset takes arguments local and split.
  • StreamingDataset takes arguments remote, local, split, and a few other arguments for various purposes.

To use a local dataset with StreamingDataset, simply don't set the remote path. local works the same way in both.

In StreamingDataset, you use dist from pytorch, which is not used in LocalDataset. Why?

LocalDataset is a PyTorch Dataset, which are used in combination with DistributedSampler, etc.

StreamingDataset is a PyTorch IterableDataset, so it has to provide/handle many complicated things itself (that is why we wrote an entire repo for it).

In the particular case of why we import torch.distributed in dataset.py, that is because we have to do some specific setup work in the local rank zeros of each node while the other ranks wait for them via a distributed barrier.

I start train with LocalDataset, but it raise CUDA out of memory after few epoch. As my observation, there is a leaking memory issue from dataloader after each eval process. Is that a problems?

That could definitely be a problem, but we are unable to tell without the code in question. Please file a bug with repro if the problem is on our side.

@tungdq212
Copy link
Author

tungdq212 commented Jun 19, 2023

@knighton thanks for ur quick response,
I implement a custom dataset:


class Custom(StreamingDataset):
    def __init__(self, 
                 data_path, 
                 feature_dim=64):
        self.feature_dim = feature_dim
        super().__init__(remote='', local=data_path)

    def get_item(self, sample_id: int, retry: int = 7) -> Any:
        sample = super().get_item(sample_id, retry)
        out = {}
        if 'caption_latents' in sample:
            out['caption_latents'] = torch.from_numpy(
                np.frombuffer(sample['caption_latents'], dtype=np.float16).copy()).reshape(77, 768)

        if 'image_latents' in sample:
            out['image_latents'] = torch.from_numpy(np.frombuffer(sample['image_latents'],
                                                                  dtype=np.float16).copy()).reshape(4, self.feature_dim, self.feature_dim)
        return out

And build train and val dataloader with config:

  train_dataset:
    _target_: build_dataloader
    data_path: train/train_512
    feature_dim: 64
    num_workers: 8
    pin_memory: true
  eval_dataset:
    _target_: build_dataloader
    data_path: val/val_512
    feature_dim: 64
    num_workers: 8
    pin_memory: true

It raise error:

InstantiationException: Error in call to target 
'diffusion.datasets.custom.build_dataset':
ValueError("Reused local directory: 
['/train/train_512'] vs 
['val/val_512']. Provide a different one.")

If a add streaming.base.util.clean_stale_shared_memory(), it still raise error with 2 gpus, but ok with 1 gpu.
By the way, implement local dataset still raise CUDA out of memory after few eval process.

class Custom(Array, Dataset):
    def __init__(self, 
                 data_path, 
                 feature_dim=64):
        self.feature_dim = feature_dim

        index_file = os.path.join(data_path, 'index.json')
        data = json.load(open(index_file))
        if data['version'] != 2:
            raise ValueError(f'Unsupported streaming data version: {data["version"]}. ' +
                             f'Expected version 2.')
        shards = []
        for info in data['shards']:
            shard = reader_from_json(data_path, None, info)
            shards.append(shard)
        
        self.shards = shards
        samples_per_shard = np.array([shard.samples for shard in shards], np.int64)
        self.length = samples_per_shard.sum()
        self.spanner = Spanner(samples_per_shard)

    def __len__(self):
        return self.length
    
    @property
    def size(self) -> int:
        """Get the size of the dataset in samples.

        Returns:
            int: Number of samples.
        """
        return self.length

    def get_item(self, index: int) -> Dict[str, Any]:
    # def __getitem__(self, index):
        shard_id, shard_sample_id = self.spanner[index]
        shard = self.shards[shard_id]
        sample = shard[shard_sample_id]
        out = {}
        if 'caption_latents' in sample:
            out['caption_latents'] = torch.from_numpy(
                np.frombuffer(sample['caption_latents'], dtype=np.float16).copy()).reshape(77, 768)

        if 'image_latents' in sample:
            out['image_latents'] = torch.from_numpy(np.frombuffer(sample['image_latents'],
                                                                  dtype=np.float16).copy()).reshape(4, self.feature_dim, self.feature_dim)
        return out

I use code in diffusion repo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants