-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbert-wic-stats.py
318 lines (254 loc) · 12.3 KB
/
bert-wic-stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import os
import json
import torch
import numpy as np
import pandas as pd
from collections import defaultdict
from scipy.spatial.distance import cosine
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, roc_curve
class WiC:
def __init__(self, dataset: str, data_sets: list = ['test', 'dev', 'train']):
"""
Args:
dataset (str): dataset directoy
data_sets (list, default=['test', 'dev', 'train']): data sets available
Returns:
dict of data sets loaded in dataframes"""
self.dataset = dataset
self.data_sets = data_sets
def load_dataset(self) -> dict:
"""Load data sets"""
df_data_sets = dict()
for s in self.data_sets:
filename = f'{self.dataset}/{s}.txt'
rows = list()
with open(filename, mode='r', encoding='utf-8') as f:
for line in f:
if line.strip() == '': continue
row = json.loads(line)
if row is None: continue
rows.append(row)
# filter relevant data
tmp = pd.DataFrame(rows)
df_data_sets[s] = pd.DataFrame()
df_data_sets[s]['words'] = [tmp.lemma.values[i] for i in range(0, tmp.shape[0], 2)]
df_data_sets[s]['pos'] = [tmp.pos.values[i] for i in range(0, tmp.shape[0], 2)]
df_data_sets[s]['gold'] = [int(tmp['gold'].values[i]) for i in range(0, tmp.shape[0], 2)]
return df_data_sets
def load_embedding(self, model: str = 'bert-base-uncased'):
"""Load embeddings in RAM
Args:
model(str, default='bert-base-uncased'): bert model used
"""
# embeddings wrapper
embeddings = defaultdict(lambda: defaultdict(dict))
# number of layers of the model
n_model_layers = len(os.listdir(f'{self.dataset}/target_embeddings/{model}/test/'))
for s in self.data_sets:
for layer in range(1, n_model_layers + 1):
# load embeddings
filename = f'{self.dataset}/target_embeddings/{model}/{s}/{layer}.pt'
E = torch.load(filename)
# split embeddings for sentence1 and sentence2
E1, E2 = list(), list()
for i in range(0, E.shape[0], 2):
E1.append(E[i])
E2.append(E[i + 1])
# embeddings in memory
embeddings[s]['sent1'][layer] = torch.stack(E1)
embeddings[s]['sent2'][layer] = torch.stack(E2)
return embeddings
def compute_similarities(self, embeddings: dict) -> dict:
"""Compute cosine similarities"""
# number of layers of the model
n_model_layers = len(embeddings['sent1'].keys())
# wrapper for similarities
scores = defaultdict(list)
# number of pairs of the dataset
n_pairs = embeddings['sent1'][1].shape[0]
# i-th pair
for i in range(n_pairs):
E1, E2 = list(), list()
# j-th layer
for j in range(1, n_model_layers + 1):
E1_layer_j, E2_layer_j = embeddings['sent1'][j][i].cpu(), embeddings['sent2'][j][i].cpu()
E1.append(E1_layer_j)
E2.append(E2_layer_j)
# Cosine Similarity: single layer
cs = 1 - cosine(E1_layer_j.numpy(), E2_layer_j.numpy())
scores[f'CS{j}'].append(cs)
# Cosine Similarity: average last 4 layers
cs = 1 - cosine(torch.stack(E1[-4:]).mean(axis=0).numpy(),
torch.stack(E2[-4:]).mean(axis=0).numpy())
scores[f'CS{j - 4}-{j}'].append(cs)
# convert into numpy arrays
for cs in scores:
scores[cs] = np.array(scores[cs])
return scores
def set_threshold(self, y_true: np.array, y: np.array) -> float:
"""
Find the threshold that maximize the area under the curve.
Args:
y(np.array): array containing predicted values
y_true(np.array): array containing ground truth values.
Returns:
thr
"""
# False Positive Rate - True Positive Rate
fpr, tpr, thresholds = roc_curve(y_true, y)
scores = []
for thresh in thresholds:
scores.append(f1_score(y_true, [m >= thresh for m in y],
average='weighted')) # roc_auc_score(y_true, [m >= thresh for m in y]))
scores = np.array(scores)
# Max accuracy
max_ = scores.max()
# Threshold associated to the maximum accuracy
max_threshold = thresholds[scores.argmax()]
return max_threshold
def fit(self, model: str = 'bert-base-uncased') -> dict:
# load dataset
pair_data_sets = self.load_dataset()
# compute scores
embeddings = self.load_embedding(model)
# number of layers of the model
n_model_layers = len(embeddings['test']['sent1'].keys())
# Wrapper
dfs = defaultdict(lambda: defaultdict(dict)) # dataset wrapper
embs = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) # embeddings wrapper
cosine_similarities = defaultdict(lambda: defaultdict(dict)) # score wrapper
# for each data set
for s in self.data_sets:
mask = defaultdict(list) # pos mask
# for each sentence pair
for i in range(0, pair_data_sets[s].shape[0]):
pos = pair_data_sets[s].pos.values[i]
# update masks
mask[pos].append(i) # pos-specific
mask['ALL'].append(i) # total
# for each pos
for pos in mask:
mask_pos = mask[pos]
dfs[s][pos] = pair_data_sets[s].loc[mask_pos].reset_index(drop=True)
# split embeddings for sentences1 and sentences2
E1, E2 = embeddings[s].values()
for layer in range(1, n_model_layers + 1):
embs[s][pos]['sent1'][layer] = E1[layer][mask[pos]]
embs[s][pos]['sent2'][layer] = E2[layer][mask[pos]]
# compute scores
scores_data_sets = self.compute_similarities(embs[s][pos])
cosine_similarities[s][pos] = scores_data_sets
# get all pos and all measures
all_pos = np.unique(pair_data_sets['test'].pos.values).tolist() + ['ALL']
all_measures = cosine_similarities['test'][all_pos[0]].keys()
# wrapper stats
stats = list()
for pos in all_pos:
if pos != 'ALL': continue
for measure in all_measures:
# data for a specific pos and measure
record = dict(pos=pos, measure=measure)
# binary ground truth
if 'train' in self.data_sets:
binary_true_train = dfs['train'][pos]['gold'].values # train
binary_true_test = dfs['test'][pos]['gold'].values # test
binary_true_dev = dfs['dev'][pos]['gold'].values # dev
# graded scores
if 'train' in self.data_sets:
scores_train = cosine_similarities['train'][pos][measure] # train
scores_test = cosine_similarities['test'][pos][measure] # test
scores_dev = cosine_similarities['dev'][pos][measure] # dev
# true positives and negatives
if 'train' in self.data_sets:
train_tp = binary_true_train[binary_true_train == 1].shape[0] # train 1
train_tn = binary_true_train[binary_true_train == 0].shape[0] # train 1
test_tp = binary_true_test[binary_true_test == 1].shape[0] # test 1
test_tn = binary_true_test[binary_true_test == 0].shape[0] # test 0
dev_tp = binary_true_dev[binary_true_dev == 1].shape[0] # dev
dev_tn = binary_true_dev[binary_true_dev == 0].shape[0] # dev
# accuracy and threshold: get the best threshold for dev set
thr = self.set_threshold(binary_true_dev, scores_dev) # dev
# binary prediction through threshold
if 'train' in self.data_sets:
train_preds = [int(i >= thr) for i in scores_train] # train
dev_preds = [int(i >= thr) for i in scores_dev] # dev
test_preds = [int(i >= thr) for i in scores_test] # test
# accuracy
if 'train' in self.data_sets:
acc_train = accuracy_score(binary_true_train, train_preds) # train
acc_test = accuracy_score(binary_true_test, test_preds) # test
acc_dev = accuracy_score(binary_true_dev, dev_preds) # test
# f1-score
if 'train' in self.data_sets:
f1_train = f1_score(binary_true_train, train_preds, average='weighted') # train
f1_test = f1_score(binary_true_test, test_preds, average='weighted') # test
f1_dev = f1_score(binary_true_dev, dev_preds, average='weighted') # test
# auc_roc
if 'train' in self.data_sets:
roc_train = roc_auc_score(binary_true_train, train_preds) # train
roc_test = roc_auc_score(binary_true_test, test_preds) # test
roc_dev = roc_auc_score(binary_true_dev, dev_preds) # test
# store info
if 'train' in self.data_sets:
record['acc_train'] = acc_train
record['acc_dev'] = acc_dev
record['acc_test'] = acc_test
if 'train' in self.data_sets:
record['f1_train'] = f1_train
record['f1_dev'] = f1_dev
record['f1_test'] = f1_test
if 'train' in self.data_sets:
record['roc_train'] = roc_train
record['roc_dev'] = roc_dev
record['roc_test'] = roc_test
record['thr'] = thr
if 'train' in self.data_sets:
record['train_TP'] = train_tp
record['train_TN'] = train_tn
record['dev_TP'] = dev_tp
record['dev_TN'] = dev_tn
record['test_TP'] = test_tp
record['test_TN'] = test_tn
stats.append(record)
# create dataframe
stats = pd.DataFrame(stats)
# set column order
if 'train' in self.data_sets:
column_order = ['pos', 'measure', 'acc_dev', 'acc_train', 'acc_test', 'roc_dev', 'roc_train', 'roc_test',
'f1_dev', 'f1_train', 'f1_test', 'dev_TP', 'dev_TN', 'train_TP', 'train_TN', 'test_TP',
'test_TN', 'thr']
else:
column_order = ['pos', 'measure', 'acc_dev', 'acc_test', 'roc_dev', 'roc_test',
'f1_dev', 'f1_test', 'dev_TP', 'dev_TN', 'test_TP', 'test_TN', 'thr']
stats = stats[column_order]
stats['layer'] = stats['measure'].apply(lambda x: x.replace('CS', ''))
stats = stats.sort_values('f1_test', ascending=False)
return stats
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(prog='WiC evaluation', add_help=True)
parser.add_argument('-d', '--dataset',
type=str,
help='Dirname to a tokenize dataset for LSC detection')
parser.add_argument('-m', '--model',
type=str,
help='Hugginface pre-trained model')
parser.add_argument('-t', '--test_set',
action='store_true',
help='If test set is available')
parser.add_argument('-T', '--train_set',
action='store_true',
help='If train set is available')
parser.add_argument('-D', '--dev_set',
action='store_true',
help='If dev set is available')
args = parser.parse_args()
data_sets = list()
if args.test_set:
data_sets.append('test')
if args.train_set:
data_sets.append('train')
if args.dev_set:
data_sets.append('dev')
w = WiC(args.dataset, data_sets)
w.fit(args.model).to_csv(f'{args.dataset}/wic_stats.tsv', sep='\t', index=False)