-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmetric.py
151 lines (131 loc) · 6.83 KB
/
metric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import numpy as np
from utils.normalize_predicted_scores import normalize_predicted_scores
class Metric:
def __init__(self):
pass
def score(self, truth, recommendations, topn_score, index_mask) -> float:
raise NotImplementedError("This method should be implemented by a subclass.")
class NDCG(Metric):
def __init__(self):
super().__init__()
self.metric_name = "NDCG"
def score(self, truth, recommendations, topn_score, index_mask) -> float:
# calculate and store the ndcg for each user
ndcg_per_user = []
# pre-compute the dg for each ranked element
discounted_gain_per_k = np.array([1 / np.log2(i + 1) for i in range(1, topn_score + 1)])
# pre-compute the idcg
idcg = discounted_gain_per_k.sum()
# loop through recommendation lists of each user
for user in recommendations['user'].unique():
# if there are no or too few recommendations for this user, skip
predictions = recommendations[recommendations['user'] == user]
if predictions.shape[0] < len(index_mask):
ndcg_per_user.append(0)
continue
# get sampling indices
sample_indices = np.argwhere(index_mask).flatten()
# look only at the sampled recommendations
if 'ensemble_probabilities' in predictions.columns:
top_k_predictions = predictions['item']
else:
top_k_predictions = predictions.values[:, 0][sample_indices]
# filter interactions for current user from test set
positive_test_interactions = truth["item"][truth["user"] == user].values
# check how many of the top-k recommendations appear in the test set
hits = np.in1d(top_k_predictions, positive_test_interactions)
# calculate the dcg for this user
user_dcg = discounted_gain_per_k[hits].sum()
# calculate the ndcg for this user
user_ndcg = user_dcg / idcg
# append current ndcg
ndcg_per_user.append(user_ndcg)
return sum(ndcg_per_user) / len(ndcg_per_user)
class Precision(Metric):
def __init__(self):
super().__init__()
self.metric_name = "Precision"
def score(self, truth, recommendations, topn_score, index_mask) -> float:
# calculate and store the precision for each user
precision_per_user = []
# loop through recommendation lists of each user
for user, predictions in recommendations.items():
# if there are no or too few recommendations for this user, skip
if predictions.shape[0] < len(index_mask):
precision_per_user.append(0)
continue
# get sampling indices
sample_indices = np.argwhere(np.array(index_mask) == 1).flatten()
# look only at the sampled recommendations
top_k_predictions = predictions.values[:, 0][sample_indices]
# filter interactions for current user from test set
positive_test_interactions = truth["item"][truth["user"] == user].values
# check how many of the top-k recommendations appear in the test set
hits = np.in1d(top_k_predictions, positive_test_interactions).sum()
# calculate the precision for this user
user_precision = hits / topn_score
# append current precision
precision_per_user.append(user_precision)
# the final result is the average precision over each user
return sum(precision_per_user) / len(precision_per_user)
class MRR(Metric):
def __init__(self):
super().__init__()
self.metric_name = "MRR"
def score(self, truth, recommendations, topn_score, index_mask) -> float:
# calculate and store the mrr for each user
mrr_per_user = []
# pre-compute the reciprocal rank for each ranked element
mrr_per_k = np.array([1 / i for i in range(1, topn_score + 1)])
# pre-compute the N
N = (1 / topn_score)
for user in recommendations['user'].unique():
# if there are no or too few recommendations for this user, skip
predictions = recommendations[recommendations['user'] == user]
if predictions.shape[0] < len(index_mask):
mrr_per_user.append(0)
continue
# get sampling indices
sample_indices = np.argwhere(index_mask).flatten()
# look only at the sampled recommendations
if 'ensemble_probabilities' in predictions.columns:
top_k_predictions = predictions['item']
else:
top_k_predictions = predictions.values[:, 0][sample_indices]
# filter interactions for current user from test set
positive_test_interactions = truth["item"][truth["user"] == user].values
# check how many of the top-k recommendations appear in the test set
hits = np.in1d(top_k_predictions, positive_test_interactions)
# calculate the dcg for this user
user_rr = mrr_per_k[hits].sum()
# calculate the ndcg for this user
user_mrr = user_rr * N
# append current ndcg
mrr_per_user.append(user_mrr)
return sum(mrr_per_user) / len(mrr_per_user)
class NewMetric(Metric):
def __init__(self, random_state: int = 42):
super().__init__()
self.random_state = random_state
def score(self, truth, recommendations, topn_score, index_mask) -> float:
user_new_metric_list = []
np.random.seed(42)
position_probabilities = np.array([1 / i for i in range(1, topn_score + 1)])
for user in recommendations['user'].unique():
# if there are no or too few recommendations for this user, skip
predictions = recommendations[recommendations['user'] == user]
if predictions.shape[0] < len(index_mask):
user_new_metric_list.append(0)
continue
prediction_probabilities = normalize_predicted_scores(predictions['score'].values)
predicted_items = np.random.choice(a=predictions['item'].values,
size=len(predictions),
replace=False,
p=prediction_probabilities)
# filter interactions for current user from test set
positive_test_interactions = truth["item"][truth["user"] == user].values
# check how many of the top-k recommendations appear in the test set
hits = np.in1d(predicted_items, positive_test_interactions)
ratio_list = position_probabilities[hits].sum()
user_new_metric_list.append(np.mean(ratio_list))
return sum(user_new_metric_list) / len(user_new_metric_list)