-
Notifications
You must be signed in to change notification settings - Fork 0
/
als.py
74 lines (55 loc) · 2.58 KB
/
als.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import scipy.sparse as sp
import numpy as np
from tqdm import tqdm
from recsys.mf.core import CoreMF
class ALS(CoreMF):
def __init__(self, iterations, factors, alpha, confidence, seed, calculate_loss=True):
"""
Attributes
----------
confidence: float
user-item confidence regularization parameter C{u,i} = 1 + confidence * R{u,i}
"""
super().__init__(iterations=iterations,
factors=factors,
learning_rate=0,
alpha=alpha,
beta=0,
seed=seed,
calculate_loss=calculate_loss)
self.confidence = confidence
def fit(self, user_to_item: sp.csr_matrix):
self.__fit_preparation__(user_to_item)
implicit_values = user_to_item.toarray()
n_users, n_items = user_to_item.shape
# Preference matrix user-to-item
P = np.where(implicit_values > 0, 1, 0)
P_t = P.T
# Confidence matrix user-to-item
C = 1 + self.confidence * implicit_values
C_t = C.T
# Identity regularization matrix
alpha_identity = self.alpha * np.eye(self.factors) # factors x factors
tqdm_range = tqdm(np.arange(self.iterations), desc='Epochs', colour='green')
def als_step(n, fixed, latent, preference_matrix, confidence_matrix):
Y = fixed # m x factors
YT = fixed.T # factors x m
YTY = YT @ Y # factors x factors
for j in np.arange(n):
# X[j] * (YT * C[j] * Y + alpha * E) = YT * C[j] * P[j]
# Faster way to calculate [ YT * C[j] * Y ]: [ YT * Y + YT * (C[j] - E) * Y ]
confidence = confidence_matrix[j] # 1 x m
preference = preference_matrix[j] # 1 x m
nonzero_mask = preference > 0
YT_Cj = YT[:, nonzero_mask] * confidence[nonzero_mask]
YT_Cj_Pj = np.sum(YT_Cj, axis=1)
YT_Cj = YT[:, nonzero_mask] * (confidence - 1)[nonzero_mask] # factors x nonzero
YT_Cj_Y = YT_Cj @ Y[nonzero_mask, :] # factors x factors
latent[j] = np.linalg.solve(YTY + YT_Cj_Y + alpha_identity, YT_Cj_Pj)
for _ in tqdm_range:
# Users learning
als_step(n_users, self.item_factors, self.user_factors, P, C)
# Items learning
als_step(n_items, self.user_factors, self.item_factors, P_t, C_t)
if self.calculate_loss:
self.rmse(user_to_item, tqdm_range, n_elements=10_000)