forked from PaddlePaddle/PaddleNLP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_ann_data_gen.py
executable file
·201 lines (150 loc) · 7.78 KB
/
run_ann_data_gen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import sys
import os
import numpy as np
from os.path import isfile, join
import argparse
import logging
import time
from functools import partial
import hnswlib
import paddle
import paddlenlp as ppnlp
from paddlenlp.datasets import load_dataset, MapDataset, load_dataset
from paddlenlp.data import Stack, Tuple, Pad
from paddlenlp.utils.log import logger
from ance.model import SemanticIndexANCE
from data import get_latest_checkpoint, get_latest_ann_data
from data import convert_example, create_dataloader
from data import gen_id2corpus, gen_text_file
from ann_util import build_index
# yapf: disable
parser = argparse.ArgumentParser()
# Required parameters
parser.add_argument("--similar_text_pair_file", default=None, type=str,required=True, help="The train_set tsv file that each line is simialr text pair")
parser.add_argument("--corpus_file", default=None, type=str, required=True, help="The corpus file that each line is a text for buinding indexing")
parser.add_argument("--save_dir", default=None, type=str, required=True, help="Saved model dir, will look for latest checkpoint dir in here")
parser.add_argument("--ann_data_dir", default=None, type=str, required=True, help="The output directory where the training data will be written")
parser.add_argument("--init_from_ckpt", default=None, type=str, help="Initial model dir, will use this if no checkpoint is found in model_dir")
parser.add_argument("--end_ann_step", default=1000000, type=int, help="Stop after this number of data versions has been generated, default run forever")
parser.add_argument("--batch_size", default=128, type=int, help="Batch size for predicting embedding of texts")
parser.add_argument("--output_emb_size", default=None, type=int, help="output_embedding_size")
parser.add_argument("--max_seq_length", default=128, type=int, help="Batch size for predicting embedding of texts")
parser.add_argument("--topk_training", default=500, type=int, help="top k from which negative samples are collected")
parser.add_argument("--num_negative_sample", default=5, type=int, help="at each resample, how many negative samples per query do I use")
# hnsw argument
parser.add_argument("--hnsw_m", default=10, type=int, help="Recall number for each query from Ann index.")
parser.add_argument("--hnsw_ef", default=10, type=int, help="Recall number for each query from Ann index.")
parser.add_argument("--hnsw_max_elements", default=1000000, type=int, help="Recall number for each query from Ann index.")
args = parser.parse_args()
# yapf: enable
def generate_new_ann(args, data_loader_dict, checkpoint_path, latest_step_num):
pretrained_model = ppnlp.transformers.ErnieModel.from_pretrained(
'ernie-1.0')
model = SemanticIndexANCE(
pretrained_model, output_emb_size=args.output_emb_size)
logger.info("checkpoint_path:{}".format(checkpoint_path))
state_dict = paddle.load(checkpoint_path)
model.set_dict(state_dict)
logger.info("load params from:{}".format(checkpoint_path))
logger.info("***** inference of corpus *****")
final_index = build_index(args, data_loader_dict["corpus_data_loader"],
model)
logger.info("***** inference of query *****")
query_embedding = model.get_semantic_embedding(data_loader_dict[
"text_data_loader"])
text_list = data_loader_dict["text_list"]
id2corpus = data_loader_dict["id2corpus"]
text2similar_text = data_loader_dict["text2similar_text"]
new_ann_data_path = os.path.join(args.ann_data_dir, str(latest_step_num))
if not os.path.exists(new_ann_data_path):
os.mkdir(new_ann_data_path)
with open(os.path.join(new_ann_data_path, "new_ann_data"), 'w') as f:
for batch_index, batch_query_embedding in enumerate(query_embedding):
recalled_idx, cosine_sims = final_index.knn_query(
batch_query_embedding, args.topk_training)
batch_size = len(cosine_sims)
for row_index in range(batch_size):
text_index = args.batch_size * batch_index + row_index
hard_neg_samples = recalled_idx[row_index][
-1 * args.num_negative_sample:]
hard_neg_sims = cosine_sims[row_index][
-1 * args.num_negative_sample:]
for idx, hard_neg_doc_idx in enumerate(hard_neg_samples):
text = text_list[text_index]["text"]
similar_text = text2similar_text[text]
hard_neg_sample = id2corpus[hard_neg_doc_idx]
cosine_sim = 1.0 - hard_neg_sims[idx]
f.write("{}\t{}\t{}\n".format(text, similar_text,
hard_neg_sample))
succeed_flag_file = os.path.join(new_ann_data_path, "succeed_flag_file")
open(succeed_flag_file, 'a').close()
logger.info("finish generate ann data step:{}".format(latest_step_num))
def build_data_loader(args, tokenizer):
""" build corpus_data_loader and text_data_loader
"""
id2corpus = gen_id2corpus(args.corpus_file)
# conver_example function's input must be dict
corpus_list = [{idx: text} for idx, text in id2corpus.items()]
corpus_ds = MapDataset(corpus_list)
trans_func = partial(
convert_example,
tokenizer=tokenizer,
max_seq_length=args.max_seq_length)
batchify_fn = lambda samples, fn=Tuple(
Pad(axis=0, pad_val=tokenizer.pad_token_id), # text_input
Pad(axis=0, pad_val=tokenizer.pad_token_type_id), # text_segment
): [data for data in fn(samples)]
corpus_data_loader = create_dataloader(
corpus_ds,
mode='predict',
batch_size=args.batch_size,
batchify_fn=batchify_fn,
trans_fn=trans_func)
# build text data_loader
text_list, text2similar_text = gen_text_file(args.similar_text_pair_file)
text_ds = MapDataset(text_list)
text_data_loader = create_dataloader(
text_ds,
mode='predict',
batch_size=args.batch_size,
batchify_fn=batchify_fn,
trans_fn=trans_func)
d = {
"text_data_loader": text_data_loader,
"corpus_data_loader": corpus_data_loader,
"id2corpus": id2corpus,
"text2similar_text": text2similar_text,
"text_list": text_list
}
return d
def ann_data_gen(args):
# use init_from_ckpt as last_checkpoint
last_checkpoint = args.init_from_ckpt
# get latest_ann_data_step to decide when stop gen_ann_data
_, latest_ann_data_step = get_latest_ann_data(args.ann_data_dir)
rank = paddle.distributed.get_rank()
if rank == 0:
if not os.path.exists(args.ann_data_dir):
os.makedirs(args.ann_data_dir)
tokenizer = ppnlp.transformers.ErnieTokenizer.from_pretrained('ernie-1.0')
data_load_dict = build_data_loader(args, tokenizer)
while latest_ann_data_step <= args.end_ann_step:
next_checkpoint, latest_step_num = get_latest_checkpoint(args)
logger.info("next_checkpoint:{}".format(next_checkpoint))
if next_checkpoint == last_checkpoint:
logger.info("next_checkpoint == lase_checkpoint:{}".format(
next_checkpoint))
logger.info("sleep 10s")
time.sleep(10)
else:
logger.info("start generate ann data using checkpoint:{}".format(
next_checkpoint))
generate_new_ann(args, data_load_dict, next_checkpoint,
latest_step_num)
logger.info("finished generating ann data step {}".format(
latest_step_num))
last_checkpoint = next_checkpoint
def main():
ann_data_gen(args)
if __name__ == "__main__":
main()