Skip to content

Commit

Permalink
enable multi-threaded conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
kpertsch committed Aug 18, 2023
1 parent cff9f53 commit cd8d634
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 83 deletions.
35 changes: 19 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ conda activate rlds_env
```

If you want to manually create an environment, the key packages to install are `tensorflow`,
`tensorflow_datasets`, `tensorflow_hub`, `apache_beam`, `matplotlib`, `plotly` and `wandb`.
`tensorflow_datasets`, `tensorflow_hub`, `matplotlib`, `plotly` and `wandb`.


## Run Example RLDS Dataset Creation

Before modifying the code to convert your own dataset, run the provided example dataset creation script to ensure
everything is installed correctly. Run the following lines to create some dummy data and convert it to RLDS.
```
pip3 install -e .
cd example_dataset
python3 create_example_data.py
tfds build
Expand Down Expand Up @@ -52,13 +53,14 @@ Please add detailed documentation what each feature consists of (e.g. what are t
Note that we store `language_instruction` in every step even though it is episode-wide information for easier downstream usage (if your dataset
does not define language instructions, you can fill in a dummy string like `pick up something`).

3. **Modify Dataset Splits**: The function `_split_generator()` determines the splits of the generated dataset (e.g. training, validation etc.).
If your dataset defines a train vs validation split, please provide the corresponding information to `_generate_examples()`, e.g.
by pointing to the corresponding folders (like in the example) or file IDs etc. If your dataset does not define splits,
remove the `val` split and only include the `train` split. You can then remove all arguments to `_generate_examples()`.
3. **Modify Dataset Splits**: The function `_split_paths()` determines the splits of the generated dataset (e.g. training, validation etc.).
If your dataset defines a train vs validation split, please provide the corresponding file paths, e.g.
by pointing to the corresponding folders (like in the example). If your dataset does not define splits,
remove the `val` split and only include the `train` split.

4. **Modify Dataset Conversion Code**: Next, modify the function `_generate_examples()`. Here, your own raw data should be
loaded, filled into the episode steps and then yielded as a packaged example. Note that the value of the first return argument,
loaded, filled into the episode steps and then yielded as a packaged example. Your iterator can yield multiple examples
for each input file path. Note that the value of the first return argument,
`episode_path` in the example, is only used as a sample ID in the dataset and can be set to any value that is connected to the
particular stored episode, or any other random value. Just ensure to avoid using the same ID twice.

Expand All @@ -70,7 +72,9 @@ few example trajectory images from the dataset for visualization.
Most common is the [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/) license --
you can copy it from [here](https://github.com/teamdigitale/licenses/blob/master/CC-BY-4.0).

That's it! You're all set to run dataset conversion. Inside the dataset directory, run:
That's it! You're all set to run dataset conversion. Before starting the processing, you need to install your
dataset package by modifying `example_dataset` to the name of your dataset in `setup.py` and running `pip install -e`.
Then, make sure that no GPUs are used during data processing (`export CUDA_VISIBLE_DEVICES=`) and inside the dataset directory, run:
```
tfds build --overwrite
```
Expand All @@ -79,16 +83,15 @@ Please verify that this output looks as expected and that you can find the gener


### Parallelizing Data Processing
By default, dataset conversion is single-threaded. If you are parsing a large dataset, you can use parallel processing.
For this, replace the last two lines of `_generate_examples()` with the commented-out `beam` commands. This will use
Apache Beam to parallelize data processing. Before starting the processing, you need to install your dataset package
by filling in the name of your dataset into `setup.py` and running `pip install -e .`
By default, dataset conversion uses 10 parallel workers. If you are parsing a large dataset, you can increase the
number of used workers by increasing `N_WORKERS` in the dataset class. Try to use slightly fewer workers than the
number of cores in your machine (run `htop` in your command line if you don't know how many cores your machine has).

The dataset value `MAX_PATHS_IN_MEMORY` controls how many filepaths will be processed in parallel before they get
written to disk sequentially. As a rule of thumb, setting this value as high as possible will make dataset conversion
faster, but don't set it too high to not overflow the memory of your machine. Setting it to >10-20x the number of workers
is usually a good default. You can monitor `htop` during conversion and reduce the value in case your memory overflows.

Then, make sure that no GPUs are used during data processing (`export CUDA_VISIBLE_DEVICES=`) and run:
```
tfds build --overwrite --beam_pipeline_options="direct_running_mode=multi_processing,direct_num_workers=10"
```
You can specify the desired number of workers with the `direct_num_workers` argument.

## Visualize Converted Dataset
To verify that the data is converted correctly, please run the data visualization script from the base directory:
Expand Down
1 change: 0 additions & 1 deletion environment_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ dependencies:
- pip:
- absl-py==1.4.0
- aiohttp==3.8.3
- apache-beam==2.48.0
- array-record==0.4.0
- async-timeout==4.0.2
- attrs==22.1.0
Expand Down
1 change: 0 additions & 1 deletion environment_ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ dependencies:
- pip:
- absl-py==1.4.0
- anyio==3.7.1
- apache-beam==2.49.0
- appdirs==1.4.4
- array-record==0.4.0
- astunparse==1.6.3
Expand Down
226 changes: 226 additions & 0 deletions example_dataset/conversion_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
from typing import Tuple, Any, Dict, Union, Callable, Iterable
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

import itertools
from multiprocessing import Pool
from functools import partial
from tensorflow_datasets.core import download
from tensorflow_datasets.core import split_builder as split_builder_lib
from tensorflow_datasets.core import naming
from tensorflow_datasets.core import splits as splits_lib
from tensorflow_datasets.core import utils
from tensorflow_datasets.core import writer as writer_lib
from tensorflow_datasets.core import example_serializer
from tensorflow_datasets.core import dataset_builder
from tensorflow_datasets.core import file_adapters

Key = Union[str, int]
# The nested example dict passed to `features.encode_example`
Example = Dict[str, Any]
KeyExample = Tuple[Key, Example]


class MultiThreadedDatasetBuilder(tfds.core.GeneratorBasedBuilder):
"""DatasetBuilder for example dataset."""
N_WORKERS = 10 # number of parallel workers for data conversion
MAX_PATHS_IN_MEMORY = 100 # number of paths converted & stored in memory before writing to disk
# -> the higher the faster / more parallel conversion, adjust based on avilable RAM
# note that one path may yield multiple episodes and adjust accordingly
PARSE_FCN = None # needs to be filled with path-to-record-episode parse function

def _split_generators(self, dl_manager: tfds.download.DownloadManager):
"""Define data splits."""
split_paths = self._split_paths()
return {split: type(self).PARSE_FCN(paths=split_paths[split]) for split in split_paths}

def _generate_examples(self):
pass # this is implemented in global method to enable multiprocessing

def _download_and_prepare( # pytype: disable=signature-mismatch # overriding-parameter-type-checks
self,
dl_manager: download.DownloadManager,
download_config: download.DownloadConfig,
) -> None:
"""Generate all splits and returns the computed split infos."""
assert self.PARSE_FCN is not None # need to overwrite parse function
split_builder = ParallelSplitBuilder(
split_dict=self.info.splits,
features=self.info.features,
dataset_size=self.info.dataset_size,
max_examples_per_split=download_config.max_examples_per_split,
beam_options=download_config.beam_options,
beam_runner=download_config.beam_runner,
file_format=self.info.file_format,
shard_config=download_config.get_shard_config(),
split_paths=self._split_paths(),
parse_function=type(self).PARSE_FCN,
n_workers=self.N_WORKERS,
max_paths_in_memory=self.MAX_PATHS_IN_MEMORY,
)
split_generators = self._split_generators(dl_manager)
split_generators = split_builder.normalize_legacy_split_generators(
split_generators=split_generators,
generator_fn=self._generate_examples,
is_beam=False,
)
dataset_builder._check_split_names(split_generators.keys())

# Start generating data for all splits
path_suffix = file_adapters.ADAPTER_FOR_FORMAT[
self.info.file_format
].FILE_SUFFIX

split_info_futures = []
for split_name, generator in utils.tqdm(
split_generators.items(),
desc="Generating splits...",
unit=" splits",
leave=False,
):
filename_template = naming.ShardedFileTemplate(
split=split_name,
dataset_name=self.name,
data_dir=self.data_path,
filetype_suffix=path_suffix,
)
future = split_builder.submit_split_generation(
split_name=split_name,
generator=generator,
filename_template=filename_template,
disable_shuffling=self.info.disable_shuffling,
)
split_info_futures.append(future)

# Finalize the splits (after apache beam completed, if it was used)
split_infos = [future.result() for future in split_info_futures]

# Update the info object with the splits.
split_dict = splits_lib.SplitDict(split_infos)
self.info.set_splits(split_dict)


class _SplitInfoFuture:
"""Future containing the `tfds.core.SplitInfo` result."""

def __init__(self, callback: Callable[[], splits_lib.SplitInfo]):
self._callback = callback

def result(self) -> splits_lib.SplitInfo:
return self._callback()


def parse_examples_from_generator(paths, fcn, split_name, total_num_examples, features, serializer):
generator = fcn(paths)
outputs = []
for sample in utils.tqdm(
generator,
desc=f'Generating {split_name} examples...',
unit=' examples',
total=total_num_examples,
leave=False,
mininterval=1.0,
):
if sample is None: continue
key, example = sample
try:
example = features.encode_example(example)
except Exception as e: # pylint: disable=broad-except
utils.reraise(e, prefix=f'Failed to encode example:\n{example}\n')
outputs.append((key, serializer.serialize_example(example)))
return outputs


class ParallelSplitBuilder(split_builder_lib.SplitBuilder):
def __init__(self, *args, split_paths, parse_function, n_workers, max_paths_in_memory, **kwargs):
super().__init__(*args, **kwargs)
self._split_paths = split_paths
self._parse_function = parse_function
self._n_workers = n_workers
self._max_paths_in_memory = max_paths_in_memory

def _build_from_generator(
self,
split_name: str,
generator: Iterable[KeyExample],
filename_template: naming.ShardedFileTemplate,
disable_shuffling: bool,
) -> _SplitInfoFuture:
"""Split generator for example generators.
Args:
split_name: str,
generator: Iterable[KeyExample],
filename_template: Template to format the filename for a shard.
disable_shuffling: Specifies whether to shuffle the examples,
Returns:
future: The future containing the `tfds.core.SplitInfo`.
"""
total_num_examples = None
serialized_info = self._features.get_serialized_info()
writer = writer_lib.Writer(
serializer=example_serializer.ExampleSerializer(serialized_info),
filename_template=filename_template,
hash_salt=split_name,
disable_shuffling=disable_shuffling,
file_format=self._file_format,
shard_config=self._shard_config,
)

del generator # use parallel generators instead
paths = self._split_paths[split_name]
path_lists = chunk_max(paths, self._n_workers, self._max_paths_in_memory) # generate N file lists
print(f"Generating with {self._n_workers} workers!")
pool = Pool(processes=self._n_workers)
for i, paths in enumerate(path_lists):
print(f"Processing chunk {i + 1} of {len(path_lists)}.")
results = pool.map(
partial(
parse_examples_from_generator,
fcn=self._parse_function,
split_name=split_name,
total_num_examples=total_num_examples,
serializer=writer._serializer,
features=self._features
),
paths
)
# write results to shuffler --> this will automatically offload to disk if necessary
print("Writing conversion results...")
for result in itertools.chain(*results):
key, serialized_example = result
writer._shuffler.add(key, serialized_example)
writer._num_examples += 1
pool.close()

print("Finishing split conversion...")
shard_lengths, total_size = writer.finalize()

split_info = splits_lib.SplitInfo(
name=split_name,
shard_lengths=shard_lengths,
num_bytes=total_size,
filename_template=filename_template,
)
return _SplitInfoFuture(lambda: split_info)


def dictlist2listdict(DL):
" Converts a dict of lists to a list of dicts "
return [dict(zip(DL, t)) for t in zip(*DL.values())]

def chunks(l, n):
"""Yield n number of sequential chunks from l."""
d, r = divmod(len(l), n)
for i in range(n):
si = (d + 1) * (i if i < r else r) + d * (0 if i < r else i - r)
yield l[si:si + (d + 1 if i < r else d)]

def chunk_max(l, n, max_chunk_sum):
out = []
for _ in range(int(np.ceil(len(l) / max_chunk_sum))):
out.append(list(chunks(l[:max_chunk_sum], n)))
l = l[max_chunk_sum:]
return out
Loading

0 comments on commit cd8d634

Please sign in to comment.