-
Notifications
You must be signed in to change notification settings - Fork 197
/
Copy pathdataset_split_by_language.py
86 lines (68 loc) · 3.02 KB
/
dataset_split_by_language.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# This tool is used to split datasets to sub-datasets
# by fast-text language model.
import os
import fire
import pandas as pd
from loguru import logger
from data_juicer.core.data import add_same_content_to_new_column
from data_juicer.format import load_formatter
from data_juicer.ops.filter.language_id_score_filter import \
LanguageIDScoreFilter
from data_juicer.utils.constant import Fields, StatsKeys
def keep_by_lang(sample, lang):
"""
Keep samples with the specified language.
:param sample: a sample in dataset
:param lang: the specified language
:return: True to keep, False to discard
"""
if sample[Fields.stats][StatsKeys.lang] == lang:
return True
return False
def main(src_dir, target_dir, text_key=None, suffixes=[], num_proc=1):
"""
Load dataset from the source directory, then apply language identification
using the operation filter called `LanguageIDScoreFilter`,
finally, split the dataset by language and save it.
:param src_dir: path to store the dataset.
:param target_dir: path to store subset files(`jsonl` format)
:param text_key: key name of field that stores sample text, default `text`:
:param suffixes: files with suffixes to be loaded, default None
:param num_proc: number of processes to process dataset, default 1.
"""
if text_key is None:
text_key = 'text'
# check if the source directory exists.
if not os.path.exists(src_dir):
raise ValueError('The raw source data directory does not exist,'
' Please check and retry.')
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
formatter = load_formatter(src_dir, text_keys=text_key, suffixes=suffixes)
dataset = formatter.load_dataset(num_proc)
op = LanguageIDScoreFilter(text_key=text_key)
if Fields.stats not in dataset.features:
# only add stats when calling filter op
dataset = dataset.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=num_proc,
desc='Adding new column for stats')
# identify language
dataset = dataset.map(op.compute_stats, num_proc=num_proc)
langs = pd.DataFrame(dataset[Fields.stats])[StatsKeys.lang]
unique_langs = list(set(langs))
logger.info(f'There are {len(dataset)} in dataset')
logger.info(f'Languages in dataset are {unique_langs}')
# split and save subset of dataset by language
for lang in unique_langs:
ds = dataset.filter(keep_by_lang,
num_proc=num_proc,
fn_kwargs=dict(lang=lang))
logger.info(f'There are {len(ds)} with language [{lang}]')
jsonl_fp = os.path.join(target_dir, lang + '.jsonl')
ds.to_json(jsonl_fp, force_ascii=False)
if __name__ == '__main__':
fire.Fire(main)