forked from fuxuemingzhu/MovieLens-Recommender
-
Notifications
You must be signed in to change notification settings - Fork 0
/
random_pred.py
144 lines (129 loc) · 5.11 KB
/
random_pred.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: fuxuemingzhu
@site: www.fuxuemingzhu.cn
@file: random_pred.py
@time: 18-4-17 下午5:48
Description : Recommend via Random Choice.
"""
import random
import math
from collections import defaultdict
import similarity
import utils
class RandomPredict:
"""
Recommend via Random Choice.
Top-N recommendation.
"""
def __init__(self, n_rec_movie=10, save_model=True):
"""
Init RandomPredict with n_rec_movie.
:return: None
"""
print("RandomPredict start...\n")
self.n_rec_movie = n_rec_movie
self.trainset = None
self.save_model = save_model
def fit(self, trainset):
"""
Fit the trainset via count movies.
:param trainset: train dataset
:return: None
"""
model_manager = utils.ModelManager()
try:
self.movie_popular = model_manager.load_model('movie_popular')
self.movie_count = model_manager.load_model('movie_count')
self.trainset = model_manager.load_model('trainset')
self.total_movies = model_manager.load_model('total_movies')
print('RandomPredict model has saved before.\nLoad model success...\n')
except OSError:
print('No model saved before.\nTrain a new model...')
self.trainset = trainset
self.movie_popular, self.movie_count = similarity.calculate_movie_popular(trainset)
self.total_movies = list(self.movie_popular.keys())
print('Train a new model success.')
if self.save_model:
model_manager.save_model(self.movie_popular, 'movie_popular')
model_manager.save_model(self.movie_count, 'movie_count')
model_manager.save_model(self.total_movies, 'total_movies')
print('The new model has saved success.\n')
def recommend(self, user):
"""
Random recommend N movies for the user.
:param user: The user we recommend movies to.
:return: the N best score movies
"""
if not self.n_rec_movie or not self.trainset or not self.movie_popular or not self.movie_count:
raise NotImplementedError('RandomPredict has not init or fit method has not called yet.')
N = self.n_rec_movie
predict_movies = list()
watched_movies = self.trainset[user]
# Random recommend N movies for the user.
while len(predict_movies) < N:
movie = random.choice(self.total_movies)
if movie not in watched_movies:
predict_movies.append(movie)
return predict_movies[:N]
def test(self, testset):
"""
Test the recommendation system by recommending scores to all users in testset.
:param testset: test dataset
:return:
"""
if not self.n_rec_movie or not self.trainset or not self.movie_popular or not self.movie_count:
raise ValueError('UserCF has not init or fit method has not called yet.')
self.testset = testset
print('Test recommendation system start...')
N = self.n_rec_movie
# varables for precision and recall
hit = 0
rec_count = 0
test_count = 0
# varables for coverage
all_rec_movies = set()
# varables for popularity
popular_sum = 0
# record the calculate time has spent.
test_time = utils.LogTime(print_step=1000)
for i, user in enumerate(self.trainset):
test_movies = self.testset.get(user, {})
rec_movies = self.recommend(user) # type:list
for movie in rec_movies:
if movie in test_movies:
hit += 1
all_rec_movies.add(movie)
popular_sum += math.log(1 + self.movie_popular[movie])
# log steps and times.
rec_count += N
test_count += len(test_movies)
# print time per 500 times.
test_time.count_time()
precision = hit / (1.0 * rec_count)
recall = hit / (1.0 * test_count)
coverage = len(all_rec_movies) / (1.0 * self.movie_count)
popularity = popular_sum / (1.0 * rec_count)
print('Test recommendation system success.')
test_time.finish()
print('precision=%.4f\trecall=%.4f\tcoverage=%.4f\tpopularity=%.4f\n' %
(precision, recall, coverage, popularity))
def predict(self, testset):
"""
Recommend movies to all users in testset.
:param testset: test dataset
:return: `dict` : recommend list for each user.
"""
movies_recommend = defaultdict(list)
print('Predict scores start...')
# record the calculate time has spent.
predict_time = utils.LogTime(print_step=500)
for i, user in enumerate(testset):
rec_movies = self.recommend(user) # type:list
movies_recommend[user].append(rec_movies)
# log steps and times.
predict_time.count_time()
print('Predict scores success.')
predict_time.finish()
return movies_recommend