diff --git a/demos/process_on_ray/configs/dedup.yaml b/demos/process_on_ray/configs/dedup.yaml new file mode 100644 index 000000000..642203249 --- /dev/null +++ b/demos/process_on_ray/configs/dedup.yaml @@ -0,0 +1,15 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' +ray_address: 'auto' + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: + tokenization: 'character' \ No newline at end of file diff --git a/demos/process_on_ray/configs/demo.yaml b/demos/process_on_ray/configs/demo.yaml index 1e3e4a55a..5154da014 100644 --- a/demos/process_on_ray/configs/demo.yaml +++ b/demos/process_on_ray/configs/demo.yaml @@ -2,11 +2,12 @@ # global parameters project_name: 'ray-demo' -executor_type: 'ray' dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' # path to your dataset directory or file -ray_address: 'auto' # change to your ray cluster address, e.g., ray://: export_path: './outputs/demo/demo-processed' +executor_type: 'ray' +ray_address: 'auto' # change to your ray cluster address, e.g., ray://: + # process schedule # a list of several process operators with their arguments process: diff --git a/docs/Distributed.md b/docs/Distributed.md new file mode 100644 index 000000000..2c440f99b --- /dev/null +++ b/docs/Distributed.md @@ -0,0 +1,144 @@ +# Distributed Data Processing in Data-Juicer + +## Overview + +Data-Juicer supports large-scale distributed data processing based on [Ray](https://github.com/ray-project/ray) and Alibaba's [PAI](https://www.aliyun.com/product/bigdata/learn). + +With a dedicated design, almost all operators of Data-Juicer implemented in standalone mode can be seamlessly executed in Ray distributed mode. We continuously conduct engine-specific optimizations for large-scale scenarios, such as data subset splitting strategies that balance the number of files and workers, and streaming I/O patches for JSON files to Ray and Apache Arrow. + +For reference, in our experiments with 25 to 100 Alibaba Cloud nodes, Data-Juicer in Ray mode processes datasets containing 70 billion samples on 6400 CPU cores in 2 hours and 7 billion samples on 3200 CPU cores in 0.45 hours. Additionally, a MinHash-LSH-based deduplication operator in Ray mode can deduplicate terabyte-sized datasets on 8 nodes with 1280 CPU cores in 3 hours. + +More details can be found in our paper, [Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models](arXiv_link_coming_soon). + +![Arch-Overview]( +https://img.alicdn.com/imgextra/i4/O1CN01uawwRu1JMSdafy5lF_!!6000000001014-2-tps-4034-4146.png) + +## Implementation and Optimizations + +### Ray Mode in Data-Juicer + +- For most implementations of Data-Juicer [operators](Operators.md), the core processing functions are engine-agnostic. Interoperability is primarily managed in [RayDataset](../data_juicer/core/ray_data.py) and [RayExecutor](../data_juicer/core/ray_executor.py), which are subclasses of the base `DJDataset` and `BaseExecutor`, respectively, and support both Ray [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) and [Actors](https://docs.ray.io/en/latest/ray-core/actors.html). +- The exception is the deduplication operators, which are challenging to scale in standalone mode. We provide these operators with the prefix [`ray_xx_deduplication`](../data_juicer/ops/deduplicator/). + +### Subset Splitting + +When dealing with tens of thousands of nodes but only a few dataset files, Ray would split the dataset files according to available resources and distribute the blocks across all nodes, incurring huge network communication costs and reduces CPU utilization. For more details, see [Ray's autodetect_parallelism](https://github.com/ray-project/ray/blob/2dbd08a46f7f08ea614d8dd20fd0bca5682a3078/python/ray/data/_internal/util.py#L201-L205) and [tuning output blocks for Ray](https://docs.ray.io/en/latest/data/performance-tips.html#tuning-output-blocks-for-read). + +To optimize performance, we automatically split the original dataset into smaller files in advance, considering the features of Arrow and Ray. By default, the single file size is set to 128MB if it results in the number of sub-files being larger than five times the total number of CPU cores in the cluster. + +### Streaming Reading of JSON Files + +To address the lack of native support in the Arrow framework underlying Ray Datasets for streaming JSON data, we have developed a streaming loading interface and contributed an in-house [patch](https://github.com/modelscope/data-juicer/pull/515) for Apache Arrow ([PR to the repo](https://github.com/apache/arrow/pull/45084)). This patch helps alleviate Out-of-Memory issues. + +### Deduplication + +An optimized MinHash-LSH-based Deduplicator is provided in Ray mode. We implement a multiprocess Union-Find set in Ray Actors and a load-balanced distributed algorithm, [BTS](https://ieeexplore.ieee.org/document/10598116), to complete equivalence class merging. This operator can deduplicate terabyte-sized datasets on 1280 CPU cores in 3 hours. Our ablation study shows 2x to 3x speedups with our dedicated optimizations for Ray mode compared to the vanilla version of this deduplication operator. + +## Performance Results + +### Data Processing with Varied Scales + +We conducted experiments on datasets with billions of samples. We prepared a 560k-sample multimodal dataset and expanded it by different factors (1x to 125000x) to create datasets of varying sizes. The experimental results, shown in the figure below, demonstrate good scalability. + +![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg) + +### Distributed Deduplication on Large-Scale Datasets + +We tested the MinHash-based RayDeduplicator on datasets sized at 200GB, 1TB, and 5TB, using CPU counts ranging from 640 to 1280 cores. As the table below shows, when the data size increases by 5x, the processing time increases by 4.02x to 5.62x. When the number of CPU cores doubles, the processing time decreases to 58.9% to 67.1% of the original time. + +| # CPU | 200GB Time | 1TB Time | 5TB Time | +|---------|------------|-----------|------------| +| 4 * 160 | 11.13 min | 50.83 min | 285.43 min | +| 8 * 160 | 7.47 min | 30.08 min | 168.10 min | + +## Quick Start + +Before starting, you should install Data-Juicer and its `dist` requirements: + +```shell +pip install -v -e . # Install the minimal requirements of Data-Juicer +pip install -v -e ".[dist]" # Include dependencies on Ray and other distributed libraries +``` + +Then start a Ray cluster (ref to the [Ray doc](https://docs.ray.io/en/latest/ray-core/starting-ray.html) for more details): + +```shell +# Start a cluster as the head node +ray start --head + +# (Optional) Connect to the cluster on other nodes/machines. +ray start --address='{head_ip}:6379' +``` + +We provide simple demos in the directory `demos/process_on_ray/`, which includes two config files and two test datasets. + +```text +demos/process_on_ray +├── configs +│ ├── demo.yaml +│ └── dedup.yaml +└── data + ├── demo-dataset.json + └── demo-dataset.jsonl +``` + +> [!Important] +> If you run these demos on multiple nodes, you need to put the demo dataset to a shared disk (e.g. NAS) and export the result dataset to it as well by modifying the `dataset_path` and `export_path` in the config files. + +### Running Example of Ray Mode + +In the `demo.yaml` config file, we set the executor type to "ray" and specify an automatic Ray address. + +```yaml +... +dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' +export_path: './outputs/demo/demo-processed' + +executor_type: 'ray' # Set the executor type to "ray" +ray_address: 'auto' # Set an automatic Ray address +... +``` + +Run the demo to process the dataset with 12 regular OPs: + +```shell +# Run the tool from source +python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml + +# Use the command-line tool +dj-process --config demos/process_on_ray/configs/demo.yaml +``` + +Data-Juicer will process the demo dataset with the demo config file and export the result datasets to the directory specified by the `export_path` argument in the config file. + +### Running Example of Distributed Deduplication + +In the `dedup.yaml` config file, we set the executor type to "ray" and specify an automatic Ray address. +And we use a dedicated distributed version of MinHash Deduplicator to deduplicate the dataset. + +```yaml +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' # Set the executor type to "ray" +ray_address: 'auto' # Set an automatic Ray address + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: # a distributed version of minhash deduplicator + tokenization: 'character' +``` + +Run the demo to deduplicate the dataset: + +```shell +# Run the tool from source +python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml + +# Use the command-line tool +dj-process --config demos/process_on_ray/configs/dedup.yaml +``` + +Data-Juicer will dedup the demo dataset with the demo config file and export the result datasets to the directory specified by the `export_path` argument in the config file. diff --git a/docs/Distributed_ZH.md b/docs/Distributed_ZH.md new file mode 100644 index 000000000..7d9c8197d --- /dev/null +++ b/docs/Distributed_ZH.md @@ -0,0 +1,143 @@ +# Data-Juicer 分布式数据处理 + +## 概览 + +Data-Juicer 支持基于 [Ray](https://github.com/ray-project/ray) 和阿里巴巴 [PAI](https://www.aliyun.com/product/bigdata/learn) 的大规模分布式数据处理。 + +经过专门的设计后,几乎所有在单机模式下实现的 Data-Juicer 算子都可以无缝地运行在 Ray 的分布式模式下。对于大规模场景,我们继续进行了针对计算引擎的特定优化,例如用于平衡文件和进程数目的数据子集分割策略,针对 Ray 和 Apache Arrow的 JSON 文件流式 I/O 补丁等。 + +作为对比参考,我们在 25 到 100 个阿里云节点上进行实验,使用 Ray 模式下的 Data-Juicer 处理不同的数据集。在 6,400 个 CPU 核上处理包含 700 亿条样本的数据集只需要花费 2 小时,在 3,200 个 CPU 核上处理包含 70 亿条样本的数据集只需要花费 0.45 小时。此外,在 Ray 模式下,Data-Juicer 的一个基于 MinHash-LSH 的去重算子在有 1,280 个 CPU 核的 8 节点集群上对 TB 大小级别的数据集进行去重只需要 3 小时。 + +更多细节请参考我们的论文:[Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models](arXiv_link_coming_soon) 。 + +![Arch-Overview]( +https://img.alicdn.com/imgextra/i4/O1CN01uawwRu1JMSdafy5lF_!!6000000001014-2-tps-4034-4146.png) + +## 实现与优化 + +### Data-Juicer 中的 Ray 模式 + +- 对于 Data-Juicer [算子](Operators.md)的大部分实现,核心处理函数是引擎无关的。[RayDataset](../data_juicer/core/ray_data.py) 和 [RayExecutor](../data_juicer/core/ray_executor.py) 保证了互通性,它们分别是基类 `DJDataset` 和 `BaseExecutor` 的子类,并且都支持 Ray [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) 和 [Actors](https://docs.ray.io/en/latest/ray-core/actors.html) 。 +- 其中,去重算子是例外。它们在单机模式下很难规模化。因此我们提供了针对它们的 Ray 版本算子,它们都已独特的前缀开头:[`ray_xx_deduplication`](../data_juicer/ops/deduplicator/) 。 + +### 子集分割 + +当在上万个节点中处理仅有若干个文件的数据集时, Ray 会根据可用资源分割数据集文件,并将它们分发到所有节点上,带来了极大的网络通信开销并减少了 CPU 利用率。更多细节可以参考文档 [Ray's autodetect_parallelism](https://github.com/ray-project/ray/blob/2dbd08a46f7f08ea614d8dd20fd0bca5682a3078/python/ray/data/_internal/util.py#L201-L205) 和 [tuning output blocks for Ray](https://docs.ray.io/en/latest/data/performance-tips.html#tuning-output-blocks-for-read) 。 + +为了优化性能,考虑到 Arrow 和 Ray 的特性,我们提前并自动地将原始数据集分割为小文件。默认情况下,如果子文件的数目超过了集群中 CPU 核的总数的 5 倍,则单个自文件的大小被设置为了 128MB 。 + +### JSON 文件的流式读取 + +为了解决 Ray Dataset 类底层框架 Arrow 对流式读取 JSON 数据的原生支持的缺失,我们开发了一个流式载入的接口并贡献到了一个针对 Apache Arrow 的内部 [补丁](https://github.com/modelscope/data-juicer/pull/515)( [相关 PR](https://github.com/apache/arrow/pull/45084) ) 。这个补丁可以缓解内存不够的问题。 + +### 去重 + +在 Ray 模式下,我们提供了一个优化过的基于 MinHash-LSH 的去重算子。我们使用 Ray Actors 实现了一个多进程的并查集和一个负载均衡的分布式算法 [BTS](https://ieeexplore.ieee.org/document/10598116) 来完成等价类合并操作。这个算子可以在 1,280 个CPU核上对 TB 大小级别的数据集去重只需要 3 个小时。我们的消融实验还表明相比于这个去重算子的初始版本,这些专门的优化项可以带来 2-3 倍的提速。 + +## 性能结果 + +### 不同数据规模的数据处理 + +我们在十亿样本规模的数据集上进行了实验。我们先准备了一个 56 万条样本的多模态数据集,并用不同的倍数(1-125,000倍)将其扩展来创建不同大小的数据集。下图的实验结果展示出了 Data-Juicer 的高扩展性。 + +![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg) + +### 大规模数据集分布式去重 + +我们在 200GB、1TB、5TB 的数据集上测试了我们的基于 MinHash 的 Ray 去重算子,测试机器的 CPU 核数从 640 核到 1280 核。如下表所示,当数据集大小增长 5 倍,处理时间增长 4.02 到 5.62 倍。当 CPU 核数翻倍,处理时间较原来减少了 58.9% 到 67.1%。 + +| CPU 核数 | 200GB 耗时 | 1TB 耗时 | 5TB 耗时 | +|---------|----------|----------|-----------| +| 4 * 160 | 11.13 分钟 | 50.83 分钟 | 285.43 分钟 | +| 8 * 160 | 7.47 分钟 | 30.08 分钟 | 168.10 分钟 | + +## 快速开始 + +在开始前,你应该安装 Data-Juicer 以及它的 `dist` 依赖需求: + +```shell +pip install -v -e . # 安装 Data-Juicer 的最小依赖需求 +pip install -v -e ".[dist]" # 包括 Ray 以及其他分布式相关的依赖库 +``` + +然后启动一个 Ray 集群(参考 [Ray 文档](https://docs.ray.io/en/latest/ray-core/starting-ray.html) ): + +```shell +# 启动一个集群并作为头节点 +ray start --head + +# (可选)在其他节点或机器上连接集群 +ray start --address='{head_ip}:6379' +``` + +我们在目录 `demos/process_on_ray/` 中准备了简单的例子,包括 2 个配置文件和 2 个测试数据集。 + +```text +demos/process_on_ray +├── configs +│ ├── demo.yaml +│ └── dedup.yaml +└── data + ├── demo-dataset.json + └── demo-dataset.jsonl +``` + +> [!Important] +> 如果你要在多个节点上运行这些例子,你需要将示例数据集放置与一个共享磁盘(如 NAS)上,并且将结果数据集导出到那里。你可以通过修改配置文件中的 `dataset_path` 和 `export_path` 参数来实现。 + +### 运行 Ray 模式样例 + +在配置文件 `demo.yaml` 中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。 + +```yaml +... +dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' +export_path: './outputs/demo/demo-processed' + +executor_type: 'ray' # 将执行器类型设置为 "ray" +ray_address: 'auto' # 设置为自动 Ray 地址 +... +``` + +运行这个例子,以使用 12 个常规算子处理测试数据集: + +```shell +# 从源码运行处理工具 +python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml + +# 使用命令行工具 +dj-process --config demos/process_on_ray/configs/demo.yaml +``` + +Data-Juicer 会使用示例配置文件处理示例数据集,并将结果数据集导出到配置文件中 `export_path` 参数指定的目录中。 + +### 运行分布式去重样例 + +在配置文件 `dedup.yaml` 中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。我们使用了 MinHash 去重算子专门的分布式版本来对数据集去重。 + +```yaml +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' # 将执行器类型设置为 "ray" +ray_address: 'auto' # 设置为自动 Ray 地址 + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: # minhash 去重算子的分布式版本 + tokenization: 'character' +``` + +运行该实例来对数据集去重: + +```shell +# 从源码运行处理工具 +python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml + +# 使用命令行工具 +dj-process --config demos/process_on_ray/configs/dedup.yaml +``` + +Data-Juicer 会使用示例配置文件对示例数据集去重,并将结果数据集导出到配置文件中 `export_path` 参数指定的目录中。