Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
BeachWang committed Dec 30, 2024
2 parents 6fdc95b + 9466c73 commit 8e01f7e
Show file tree
Hide file tree
Showing 49 changed files with 1,677 additions and 54 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ We provide a [playground](http://8.138.149.181/) with a managed JupyterLab. [Try
[Platform for AI of Alibaba Cloud (PAI)](https://www.aliyun.com/product/bigdata/learn) has cited our work and integrated Data-Juicer into its data processing products. PAI is an AI Native large model and AIGC engineering platform that provides dataset management, computing power management, model tool chain, model development, model training, model deployment, and AI asset management. For documentation on data processing, please refer to: [PAI-Data Processing for Large Models](https://help.aliyun.com/zh/pai/user-guide/components-related-to-data-processing-for-foundation-models/?spm=a2c4g.11186623.0.0.3e9821a69kWdvX).

Data-Juicer is being actively updated and maintained. We will periodically enhance and add more features, data recipes and datasets.
We welcome you to join us (via issues, PRs, [Slack](https://join.slack.com/t/data-juicer/shared_invite/zt-23zxltg9d-Z4d3EJuhZbCLGwtnLWWUDg?spm=a2c22.12281976.0.0.7a8253f30mgpjw) channel, [DingDing](https://qr.dingtalk.com/action/joingroup?spm=a2c22.12281976.0.0.7a8253f30mgpjw&code=v1,k1,C0DI7CwRFrg7gJP5aMC95FUmsNuwuKJboT62BqP5DAk=&_dt_no_comment=1&origin=11) group, ...), in promoting data-model co-development along with research and applications of (multimodal) LLMs!
We welcome you to join us (via issues, PRs, [Slack](https://join.slack.com/t/data-juicer/shared_invite/zt-23zxltg9d-Z4d3EJuhZbCLGwtnLWWUDg?spm=a2c22.12281976.0.0.7a8253f30mgpjw) channel, [DingDing](https://qr.dingtalk.com/action/joingroup?code=v1,k1,YFIXM2leDEk7gJP5aMC95AfYT+Oo/EP/ihnaIEhMyJM=&_dt_no_comment=1&origin=11) group, ...), in promoting data-model co-development along with research and applications of (multimodal) LLMs!

----

Expand All @@ -55,7 +55,7 @@ In this new version, we support more features for **multimodal data (including v
- [2024-02-05] Our paper has been accepted by SIGMOD'24 industrial track!
- [2024-01-10] Discover new horizons in "Data Mixture"—Our second data-centric LLM competition has kicked off! Please visit the competition's [official website](https://tianchi.aliyun.com/competition/entrance/532174) for more information.
- [2024-01-05] We release **Data-Juicer v0.1.3** now!
In this new version, we support **more Python versions** (3.8-3.10), and support **multimodal** dataset [converting](tools/multimodal/README.md)/[processing](docs/Operators.md) (Including texts, images, and audios. More modalities will be supported in the future).
In this new version, we support **more Python versions** (3.8-3.10), and support **multimodal** dataset [converting](tools/fmt_conversion/multimodal/README.md)/[processing](docs/Operators.md) (Including texts, images, and audios. More modalities will be supported in the future).
Besides, our paper is also updated to [v3](https://arxiv.org/abs/2309.02033).
- [2023-10-13] Our first data-centric LLM competition begins! Please
visit the competition's official websites, FT-Data Ranker ([1B Track](https://tianchi.aliyun.com/competition/entrance/532157), [7B Track](https://tianchi.aliyun.com/competition/entrance/532158)), for more information.
Expand Down
4 changes: 2 additions & 2 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Data-Juicer 是一个一站式**多模态**数据处理系统,旨在为大语

[阿里云人工智能平台 PAI](https://www.aliyun.com/product/bigdata/learn) 已引用我们的工作,将Data-Juicer的能力集成到PAI的数据处理产品中。PAI提供包含数据集管理、算力管理、模型工具链、模型开发、模型训练、模型部署、AI资产管理在内的功能模块,为用户提供高性能、高稳定、企业级的大模型工程化能力。数据处理的使用文档请参考:[PAI-大模型数据处理](https://help.aliyun.com/zh/pai/user-guide/components-related-to-data-processing-for-foundation-models/?spm=a2c4g.11186623.0.0.3e9821a69kWdvX)

Data-Juicer正在积极更新和维护中,我们将定期强化和新增更多的功能和数据菜谱。热烈欢迎您加入我们(issues/PRs/[Slack频道](https://join.slack.com/t/data-juicer/shared_invite/zt-23zxltg9d-Z4d3EJuhZbCLGwtnLWWUDg?spm=a2c22.12281976.0.0.7a8275bc8g7ypp) /[钉钉群](https://qr.dingtalk.com/action/joingroup?spm=a2c22.12281976.0.0.7a8275bc8g7ypp&code=v1,k1,C0DI7CwRFrg7gJP5aMC95FUmsNuwuKJboT62BqP5DAk=&_dt_no_comment=1&origin=11)/...),一起推进LLM-数据的协同开发和研究!
Data-Juicer正在积极更新和维护中,我们将定期强化和新增更多的功能和数据菜谱。热烈欢迎您加入我们(issues/PRs/[Slack频道](https://join.slack.com/t/data-juicer/shared_invite/zt-23zxltg9d-Z4d3EJuhZbCLGwtnLWWUDg?spm=a2c22.12281976.0.0.7a8275bc8g7ypp) /[钉钉群](https://qr.dingtalk.com/action/joingroup?code=v1,k1,YFIXM2leDEk7gJP5aMC95AfYT+Oo/EP/ihnaIEhMyJM=&_dt_no_comment=1&origin=11)/...),一起推进LLM-数据的协同开发和研究!


----
Expand All @@ -47,7 +47,7 @@ Data-Juicer正在积极更新和维护中,我们将定期强化和新增更多
- [2024-02-05] 我们的论文被SIGMOD'24 industrial track接收!
- [2024-01-10] 开启“数据混合”新视界——第二届Data-Juicer大模型数据挑战赛已经正式启动!立即访问[竞赛官网](https://tianchi.aliyun.com/competition/entrance/532174),了解赛事详情。
- [2024-01-05] **Data-Juicer v0.1.3** 版本发布了。
在这个新版本中,我们支持了**更多Python版本**(3.8-3.10),同时支持了**多模态**数据集的[转换](tools/multimodal/README_ZH.md)[处理](docs/Operators_ZH.md)(包括文本、图像和音频。更多模态也将会在之后支持)!
在这个新版本中,我们支持了**更多Python版本**(3.8-3.10),同时支持了**多模态**数据集的[转换](tools/fmt_conversion/multimodal/README_ZH.md)[处理](docs/Operators_ZH.md)(包括文本、图像和音频。更多模态也将会在之后支持)!
此外,我们的论文也更新到了[第三版](https://arxiv.org/abs/2309.02033)
- [2023-10-13] 我们的第一届以数据为中心的 LLM 竞赛开始了!
请访问大赛官网,FT-Data Ranker([1B赛道](https://tianchi.aliyun.com/competition/entrance/532157)[7B赛道](https://tianchi.aliyun.com/competition/entrance/532158) ) ,了解更多信息。
Expand Down
2 changes: 1 addition & 1 deletion configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ process:
- key_value_grouper: # Group samples to batched samples according values in given keys.
group_by_keys: null # Group samples according values in the keys. Support for nested keys such as "__dj__stats__.text_len". It is [self.text_key] in default.

# Aggregator ops.
# aggregator ops.
- entity_attribute_aggregator: # Return conclusion of the given entity's attribute from some docs.
api_model: 'gpt-4o' # API model name.
entity: '孙悟空' # The given entity.
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.0.1'
__version__ = '1.0.2'

import os
import subprocess
Expand Down
5 changes: 5 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ def init_setup_from_cfg(cfg: Namespace):

# check number of processes np
sys_cpu_count = os.cpu_count()
if not cfg.np:
cfg.np = sys_cpu_count
logger.warning(
f'Number of processes `np` is not set, '
f'set it to cpu count [{sys_cpu_count}] as default value.')
if cfg.np > sys_cpu_count:
logger.warning(f'Number of processes `np` is set as [{cfg.np}], which '
f'is larger than the cpu count [{sys_cpu_count}]. Due '
Expand Down
42 changes: 34 additions & 8 deletions data_juicer/core/ray_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,41 @@ def _run_single_op(self, op):
batch_size = getattr(op, 'batch_size',
1) if op.is_batched_op() else 1
if isinstance(op, Mapper):
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
elif isinstance(op, Filter):
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.stats_export_path is not None:
self.data.write_json(op.stats_export_path,
force_ascii=False)
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.process(*args, **kwargs)

def process_batched(self, samples, *args, **kwargs):
keys = samples.keys()
first_key = next(iter(keys))
Expand Down Expand Up @@ -378,6 +381,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.compute_stats(*args, **kwargs)

def compute_stats_batched(self, samples, *args, **kwargs):
keys = samples.keys()
num_samples = len(samples[Fields.stats])
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/generate_qa_from_examples_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ def format_qa_pairs(qa_example):
])

formatted_examples = ''.join([
self.example_template.format(qa_pairs=format_qa_pairs(qa_example))
self.example_template.format(format_qa_pairs(qa_example))
for qa_example in qa_examples
])
input_prompt = self.input_template.format(examples=formatted_examples)
input_prompt = self.input_template.format(formatted_examples)
return input_prompt

def parse_output(self, raw_output):
Expand Down
3 changes: 0 additions & 3 deletions data_juicer/utils/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class Fields(object):
context = DEFAULT_PREFIX + 'context__'
suffix = DEFAULT_PREFIX + 'suffix__'

# video_frames
video_frames = DEFAULT_PREFIX + 'video_frames__'

# the name of the original file from which this sample was derived.
source_file = DEFAULT_PREFIX + 'source_file__'

Expand Down
46 changes: 32 additions & 14 deletions data_juicer/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,50 @@ def calculate_np(name,
"""Calculate the optimum number of processes for the given OP"""
eps = 1e-9 # about 1 byte

if num_proc is None:
num_proc = psutil.cpu_count()

if use_cuda:
auto_num_proc = None
cuda_mem_available = get_min_cuda_memory() / 1024
op_proc = min(
num_proc,
math.floor(cuda_mem_available / (mem_required + eps)) *
cuda_device_count())
if use_cuda and mem_required == 0:
if mem_required == 0:
logger.warning(f'The required cuda memory of Op[{name}] '
f'has not been specified. '
f'Please specify the mem_required field in the '
f'config file, or you might encounter CUDA '
f'out of memory error. You can reference '
f'the mem_required field in the '
f'config_all.yaml file.')
if op_proc < 1.0:
logger.warning(f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')
else:
auto_num_proc = math.floor(
cuda_mem_available / mem_required) * cuda_device_count()
if cuda_mem_available / mem_required < 1.0:
logger.warning(
f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')

if auto_num_proc and num_proc:
op_proc = min(auto_num_proc, num_proc)
if num_proc > auto_num_proc:
logger.warning(
f'The given num_proc: {num_proc} is greater than '
f'the value {auto_num_proc} auto calculated based '
f'on the mem_required of Op[{name}]. '
f'Set the `num_proc` to {auto_num_proc}.')
elif not auto_num_proc and not num_proc:
op_proc = cuda_device_count()
logger.warning(
f'Both mem_required and num_proc of Op[{name}] are not set.'
f'Set the `num_proc` to number of GPUs {op_proc}.')
else:
op_proc = auto_num_proc if auto_num_proc else num_proc

op_proc = max(op_proc, 1)
return op_proc
else:
if num_proc is None:
num_proc = psutil.cpu_count()

op_proc = num_proc
cpu_available = psutil.cpu_count()
mem_available = psutil.virtual_memory().available
Expand Down
2 changes: 1 addition & 1 deletion docs/Operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This page offers a basic description of the operators (OPs) in Data-Juicer. User

## Overview

The operators in Data-Juicer are categorized into 5 types.
The operators in Data-Juicer are categorized into 7 types.

| Type | Number | Description |
|-----------------------------------|:------:|-------------------------------------------------|
Expand Down
2 changes: 1 addition & 1 deletion docs/Operators_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## 概览

Data-Juicer 中的算子分为以下 5 种类型。
Data-Juicer 中的算子分为以下 7 种类型。

| 类型 | 数量 | 描述 |
|------------------------------------|:--:|---------------|
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/ops/mapper/test_generate_qa_from_examples_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test(self):

def test_multi_process(self):
sampling_params = {'max_new_tokens': 200}
self._run_op(sampling_params=sampling_params, num_proc=3)
self._run_op(sampling_params=sampling_params, num_proc=2)

def test_vllm(self):
sampling_params = {'max_tokens': 200}
Expand Down
103 changes: 101 additions & 2 deletions tests/tools/test_process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,49 @@
import subprocess
import tempfile
import unittest
import uuid
import yaml

from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase


def run_in_subprocess(cmd):
try:
with subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as return_info:
while True:
next_line = return_info.stdout.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)

err_lines = ''
while True:
next_line = return_info.stderr.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)
err_lines += return_line + '\n'

return_code = return_info.wait()
if return_code:
raise RuntimeError(err_lines)
except Exception as e:
raise e


class ProcessDataTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

self.tmp_dir = tempfile.TemporaryDirectory().name
if not osp.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()
Expand Down Expand Up @@ -66,5 +96,74 @@ def test_status_code_1(self):
self.assertFalse(osp.exists(tmp_out_path))


class ProcessDataRayTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

cur_dir = osp.dirname(osp.abspath(__file__))
self.tmp_dir = osp.join(cur_dir, f'tmp_{uuid.uuid4().hex}')
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()

if osp.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)

import ray
ray.shutdown()

def test_ray_image(self):
tmp_yaml_file = osp.join(self.tmp_dir, 'config_0.yaml')
tmp_out_path = osp.join(self.tmp_dir, 'output_0.json')
text_keys = 'text'

data_path = osp.join(osp.dirname(osp.dirname(osp.dirname(osp.realpath(__file__)))),
'demos', 'data', 'demo-dataset-images.jsonl')
yaml_config = {
'dataset_path': data_path,
'executor_type': 'ray',
'ray_address': 'auto',
'text_keys': text_keys,
'image_key': 'images',
'export_path': tmp_out_path,
'process': [
{
'image_nsfw_filter': {
'hf_nsfw_model': 'Falconsai/nsfw_image_detection',
'trust_remote_code': True,
'score_threshold': 0.5,
'any_or_all': 'any',
'mem_required': '8GB'
},
'image_aspect_ratio_filter':{
'min_ratio': 0.5,
'max_ratio': 2.0
}
}
]
}

with open(tmp_yaml_file, 'w') as file:
yaml.dump(yaml_config, file)

run_in_subprocess(f'python tools/process_data.py --config {tmp_yaml_file}')

self.assertTrue(osp.exists(tmp_out_path))

from datasets import load_dataset
jsonl_files = [os.path.join(tmp_out_path, f) \
for f in os.listdir(tmp_out_path) \
if f.endswith('.json')]
dataset = load_dataset(
'json',
data_files={'jsonl': jsonl_files})

self.assertEqual(len(dataset['jsonl']), 3)
for item in dataset['jsonl']:
self.assertIn('aspect_ratios', item['__dj__stats__'])


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 8e01f7e

Please sign in to comment.