假如我是豆瓣的CEO,很多豆瓣的用户在豆瓣电影上都会对电影进行评分。那么根据这个评分数据,我们有可能知道这些用户除了自己评过分的电影之外还喜欢或讨厌哪些电影吗?这就是一个典型的推荐问题,解决这一类问题的算法被称为推荐算法。
协同过滤的英文全称是Collaborative Filtering,简称CF。注意,这不是一款游戏!从字面上分析,协同就是寻找共同点,过滤就是筛选出优质的内容。
一般来说,协同过滤推荐分为三种类型:
1. 基于用户(user-based)的协同过滤,通过计算用户和用户的相似度找到跟用户A相似的用户B, C, D...再把这些用户喜欢的内容推荐给A;
2. 基于物品(item-based)的协同过滤,通过计算物品和物品的相似度找到跟物品1相似的物品2, 3, 4...再把这些物品推荐给看过物品1的用户们;
3. 基于模型(model based)的协同过滤。主流的方法可以分为:矩阵分解,关联算法,聚类算法,分类算法,回归算法,神经网络。
矩阵分解 (decomposition, factorization)是将矩阵拆解为数个矩阵的乘积。比如豆瓣电影有m个用户,n个电影。那么用户对电影的评分可以形成一个m行n列的矩阵R,我们可以找到一个m行k列的矩阵U,和一个k行n列的矩阵I,通过U * I来得到矩阵R。
如果想通过矩阵分解的方法实现基于模型的协同过滤,ALS是一个不错的选择,其英文全称是Alternating Least Square,翻译过来是交替最小二乘法。假设用户为a,物品为b,评分矩阵为R(m, n),可分解为用户矩阵U(k, m)和物品矩阵I(k, n),其中m, n, k代表矩阵的维度。前方小段数学公式低能预警:
矩阵R是已知的,我们随机生成用户矩阵U,
1. 利用1.5中的式5、R和U求出I
2. 利用1.5中的式6、R和I求出U
如此交替地执行步骤1和步骤2,直到算法收敛或者迭代次数超过了最大限制,最终我们用RMSE来评价模型的好坏。
本人用全宇宙最简单的编程语言——Python实现了ALS算法,没有依赖任何第三方库,便于学习和使用。
初始化,存储用户ID、物品ID、用户ID与用户矩阵列号的对应关系、物品ID与物品矩阵列号的对应关系、用户已经看过哪些物品、评分矩阵的Shape以及RMSE。
- class ALS(object):
- def __init__(self):
- self.user_ids = None
- self.item_ids = None
- self.user_ids_dict = None
- self.item_ids_dict = None
- self.user_matrix = None
- self.item_matrix = None
- self.user_items = None
- self.shape = None
- self.rmse = None
对训练数据进行处理,得到用户ID、物品ID、用户ID与用户矩阵列号的对应关系、物品ID与物品矩阵列号的对应关系、评分矩阵的Shape、评分矩阵及评分矩阵的转置。
- def _process_data(self, X):
- self.user_ids = tuple((set(map(lambda x: x[0], X))))
- self.user_ids_dict = dict(map(lambda x: x[::-1],
- enumerate(self.user_ids)))
-
- self.item_ids = tuple((set(map(lambda x: x[1], X))))
- self.item_ids_dict = dict(map(lambda x: x[::-1],
- enumerate(self.item_ids)))
-
- self.shape = (len(self.user_ids), len(self.item_ids))
-
- ratings = defaultdict(lambda: defaultdict(int))
- ratings_T = defaultdict(lambda: defaultdict(int))
- for row in X:
- user_id, item_id, rating = row
- ratings[user_id][item_id] = rating
- ratings_T[item_id][user_id] = rating
-
- err_msg = "Length of user_ids %d and ratings %d not match!" % (
- len(self.user_ids), len(ratings))
- assert len(self.user_ids) == len(ratings), err_msg
-
- err_msg = "Length of item_ids %d and ratings_T %d not match!" % (
- len(self.item_ids), len(ratings_T))
- assert len(self.item_ids) == len(ratings_T), err_msg
- return ratings, ratings_T
实现稠密矩阵与稀疏矩阵的矩阵乘法,得到用户矩阵与评分矩阵的乘积。
- def _users_mul_ratings(self, users, ratings_T):
-
- def f(users_row, item_id):
- user_ids = iter(ratings_T[item_id].keys())
- scores = iter(ratings_T[item_id].values())
- col_nos = map(lambda x: self.user_ids_dict[x], user_ids)
- _users_row = map(lambda x: users_row[x], col_nos)
- return sum(a * b for a, b in zip(_users_row, scores))
-
- ret = [[f(users_row, item_id) for item_id in self.item_ids]
- for users_row in users.data]
- return Matrix(ret)
实现稠密矩阵与稀疏矩阵的矩阵乘法,得到物品矩阵与评分矩阵的乘积。
- def _items_mul_ratings(self, items, ratings):
-
- def f(items_row, user_id):
- item_ids = iter(ratings[user_id].keys())
- scores = iter(ratings[user_id].values())
- col_nos = map(lambda x: self.item_ids_dict[x], item_ids)
- _items_row = map(lambda x: items_row[x], col_nos)
- return sum(a * b for a, b in zip(_items_row, scores))
-
- ret = [[f(items_row, user_id) for user_id in self.user_ids]
- for items_row in items.data]
- return Matrix(ret)
- def _gen_random_matrix(self, n_rows, n_colums):
- data = [[random() for _ in range(n_colums)] for _ in range(n_rows)]
- return Matrix(data)
- def _get_rmse(self, ratings):
- m, n = self.shape
- mse = 0.0
- n_elements = sum(map(len, ratings.values()))
- for i in range(m):
- for j in range(n):
- user_id = self.user_ids[i]
- item_id = self.item_ids[j]
- rating = ratings[user_id][item_id]
- if rating > 0:
- user_row = self.user_matrix.col(i).transpose
- item_col = self.item_matrix.col(j)
- rating_hat = user_row.mat_mul(item_col).data[0][0]
- square_error = (rating - rating_hat) ** 2
- mse += square_error / n_elements
- return mse ** 0.5
- def fit(self, X, k, max_iter=10):
- ratings, ratings_T = self._process_data(X)
- self.user_items = {k: set(v.keys()) for k, v in ratings.items()}
- m, n = self.shape
-
- error_msg = "Parameter k must be less than the rank of original matrix"
- assert k < min(m, n), error_msg
-
- self.user_matrix = self._gen_random_matrix(k, m)
-
- for i in range(max_iter):
- if i % 2:
- items = self.item_matrix
- self.user_matrix = self._items_mul_ratings(
- items.mat_mul(items.transpose).inverse.mat_mul(items),
- ratings
- )
- else:
- users = self.user_matrix
- self.item_matrix = self._users_mul_ratings(
- users.mat_mul(users.transpose).inverse.mat_mul(users),
- ratings_T
- )
- rmse = self._get_rmse(ratings)
- print("Iterations: %d, RMSE: %.6f" % (i + 1, rmse))
-
- self.rmse = rmse
预测一个用户感兴趣的内容,剔除用户已看过的内容。然后按感兴趣分值排序,取出前n_items个内容。
- def _predict(self, user_id, n_items):
- users_col = self.user_matrix.col(self.user_ids_dict[user_id])
- users_col = users_col.transpose
-
- items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0])
- items_scores = map(lambda x: (self.item_ids[x[0]], x[1]), items_col)
- viewed_items = self.user_items[user_id]
- items_scores = filter(lambda x: x[0] not in viewed_items, items_scores)
-
- return sorted(items_scores, key=lambda x: x[1], reverse=True)[:n_items]
循环调用2.8,预测多个用户感兴趣的内容。
- def predict(self, user_ids, n_items=10):
- return [self._predict(user_id, n_items) for user_id in user_ids]
使用电影评分数据集,训练模型并统计RMSE。
- @run_time
- def main():
- print("Tesing the accuracy of ALS...")
-
- X = load_movie_ratings()
-
- model = ALS()
- model.fit(X, k=3, max_iter=5)
- print()
-
- print("Showing the predictions of users...")
-
- user_ids = range(1, 5)
- predictions = model.predict(user_ids, n_items=2)
- for user_id, prediction in zip(user_ids, predictions):
- _prediction = [format_prediction(item_id, score)
- for item_id, score in prediction]
- print("User id:%d recommedation: %s" % (user_id, _prediction))
设置k=3,迭代5次,并展示了前4个用户的推荐内容,最终RMSE为0.370,运行时间46.5秒,效果还算不错~
- import sys
- import random
- import math
- from operator import itemgetter
-
- def ReadData(file,data):
- ''' 读取评分数据
- @param file 评分数据文件
- @param data 储存评分数据的List
- '''
- for line in file :
- line = line.strip('\n')
- linelist = line.split("::")
- data.append([linelist[0],linelist[1]])
-
- def SplitData(data, M, key, seed):
- ''' 将数据分为训练集和测试集
- @param data 储存训练和测试数据的List
- @param M 将数据分为M份
- @param key 选取第key份数据做为测试数据
- @param seed 随机种子
- @return train 训练数据集Dict
- @return test 测试数据集Dict
- '''
- test = dict ()
- train = dict ()
- random.seed(seed)
- for user,item in data:
- if random.randint(0,M) == key:
- if user in test:
- test[user].append(item)
- else:
- test[user] = []
- else:
- if user in train:
- train[user].append(item)
- else:
- train[user] = []
- return train, test
-
- def UserSimilarityOld(train):
- W = dict()
- for u in train.keys():
- W[u] = dict()
- for v in train.keys():
- if u == v:
- continue
- W[u][v] = len(list(set(train[u]) & set(train[v])))
- W[u][v] /= math.sqrt(len(train[u]) * len(train[v]) * 1.0)
- return W
-
- def ItemSimilarity(train):
- ''' 计算物品相似度
- @param train 训练数据集Dict
- @return W 记录用户相似度的二维矩阵
- '''
- C = dict()
- N = dict()
- #计算没两个item共有的user数目
- for u, items in train.items():
- for i in items:
- if i not in N:
- N[i] = 0
- N[i] += 1
- for j in items:
- if i == j:
- continue
- if i not in C :
- C[i] = dict()
- if j not in C[i]:
- C[i][j] = 0
- C[i][j] += 1
-
- W = dict()
- for i, related_items in C.items():
- for j, cij in related_items.items():
- if i not in W :
- W[i] = dict()
- W[i][j] = cij / math.sqrt(N[i] * N[j])
-
- return W
-
-
-
- def Coverage(train, test, W, N, K):
- ''' 获取推荐结果
- @param user 输入的用户
- @param train 训练数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- recommned_items = set()
- all_items = set()
-
- for user in train.keys():
- for item in train[user]:
- all_items.add(item)
-
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- recommned_items.add(item)
-
- print 'len: ',len(recommned_items),'\n'
- return len(recommned_items) / (len(all_items) * 1.0)
-
-
-
- def GetRecommendation(user, train ,W, N, K):
- ''' 获取推荐结果
- @param user 输入的用户
- @param train 训练数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- rank = dict()
- ru = train[user]
- for i in ru:
- for j,wj in sorted(W[i].items(), key=itemgetter(1),\
- reverse = True)[0:K]:
- if j in ru:
- continue
- if j in rank:
- rank[j] += wj
- else:
- rank[j] = 0
-
- rank = sorted(rank.items(), key=itemgetter(1), reverse = True)[0:N]
- return rank
-
-
-
-
- def Recall(train, test, W, N, K):
- ''' 计算推荐结果的召回率
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- hit = 0
- all = 0
- for user in train.keys():
- if user in test:
- tu = test[user]
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- if item in tu:
- hit+= 1
- all += len(tu)
- print(hit)
- print(all)
- return hit/(all * 1.0)
-
- def Precision(train, test, W, N, K):
- ''' 计算推荐结果的准确率
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- hit = 0
- all = 0
- for user in train.keys():
- if user in test:
- tu = test[user]
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- if item in tu:
- hit+= 1
- all += N
- print(hit)
- print(all)
- return hit/(all * 1.0)
-
- def Popularity(train, test, W, N, K):
- ''' 计算推荐结果的流行度
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- item_popularity = dict()
- for user, items in train.items():
- for item in items:
- if item not in item_popularity:
- item_popularity[item] = 0
- item_popularity[item] += 1
-
- ret = 0
- n = 0
- for user in train.keys():
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- ret += math.log(1+ item_popularity[item])
- n += 1
- ret /= n * 1.0
- return ret
-
-
-
- if __name__ == '__main__':
- data = []
- M = 7
- key = 1
- seed = 1
- N = 10
- K = 10
- W = dict()
- rank = dict()
-
- print("this is the main function")
- file = open('./ml-1m/rating.dat')
- ReadData(file,data)
- train,test = SplitData(data, M, key, seed)
- W = ItemSimilarity(train)
- recall = Recall(train, test, W, N, K)
- precision = Precision(train, test, W, N, K)
- popularity = Popularity(train, test, W, N, K)
- coverage = Coverage(train, test, W, N, K)
- print 'recall: ',recall,'\n'
- print 'precision: ',precision,'\n'
- print 'Popularity: ',popularity,'\n'
- print 'coverage: ', coverage,'\n'
- else :
- print("this is not the main function")
- import sys
- import random
- import math
- from operator import itemgetter
-
-
- def ReadData(file,data):
- ''' 读取评分数据
- @param file 评分数据文件
- @param data 储存评分数据的List
- '''
- for line in file:
- line = line.strip('\n')
- linelist = line.split("::")
- data.append([linelist[0],linelist[1]])
-
- def SplitData(data, M, key, seed):
- ''' 将数据分为训练集和测试集
- @param data 储存训练和测试数据的List
- @param M 将数据分为M份
- @param key 选取第key份数据做为测试数据
- @param seed 随机种子
- @return train 训练数据集Dict
- @return test 测试数据集Dict
- '''
- test = dict ()
- train = dict ()
- random.seed(seed)
- for user,item in data:
- if random.randint(0,M) == key:
- if user in test:
- test[user].append(item)
- else:
- test[user] = []
- else:
- if user in train:
- train[user].append(item)
- else:
- train[user] = []
- return train, test
-
- def UserSimilarity(train):
- ''' 计算用户相似度
- @param train 训练数据集Dict
- @return W 记录用户相似度的二维矩阵
- '''
- #建立物品到用户之间的倒查表,降低计算用户相似度的时间复杂性
- item_users = dict()
- for u, items in train.items():
- for i in items:
- if(i not in item_users):
- item_users[i] = set()
- item_users[i].add(u)
- C = dict()
- N = dict()
- #计算用户之间共有的item的数目
- for i, users in item_users.items():
- for u in users:
- if(u not in N):
- N[u] = 1
- N[u] += 1
- for v in users:
- if u == v:
- continue
- if(u not in C):
- C[u] = dict()
- if(v not in C[u]):
- C[u][v] = 0
- #对热门物品进行了惩罚,采用这种方法被称做UserCF-IIF
- C[u][v] += (1 / math.log(1+len(users)))
- W = dict()
- for u, related_users in C.items():
- for v, cuv in related_users.items():
- if(u not in W):
- W[u] = dict()
- #利用余弦相似度计算用户之间的相似度
- W[u][v] = cuv / math.sqrt(N[u] * N[v])
-
- return W
-
-
- def Coverage(train, test, W, N, K):
- ''' 获取推荐结果
- @param user 输入的用户
- @param train 训练数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- recommned_items = set()
- all_items = set()
-
- for user in train.keys():
- for item in train[user]:
- all_items.add(item)
-
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- recommned_items.add(item)
-
- #print 'len: ',len(recommned_items),'\n'
- return len(recommned_items) / (len(all_items) * 1.0)
-
- def GetRecommendation(user, train ,W, N, K):
- ''' 获取推荐结果
- @param user 输入的用户
- @param train 训练数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- rank = dict()
- interacted_items = train[user]
- #选取K个近邻计算得分
- for v,wuv in sorted(W[user].items(), key=itemgetter(1),\
- reverse = True)[0:K]:
- for i in train[v]:
- if i in interacted_items:
- continue
- if i in rank:
- rank[i] += wuv
- else:
- rank[i] = 0
-
- #取得分最高的N个item作为推荐结果
- rank = sorted(rank.items(), key=itemgetter(1), reverse = True)[0:N]
-
- return rank
-
- def Recall(train, test, W, N, K):
- ''' 计算推荐结果的召回率
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- hit = 0
- all = 0
- for user in train.keys():
- if user in test:
- tu = test[user]
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- if item in tu:
- hit+= 1
- all += len(tu)
- #print(hit)
- #print(all)
- return hit/(all * 1.0)
-
- def Precision(train, test, W, N, K):
- ''' 计算推荐结果的准确率
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- hit = 0
- all = 0
- for user in train.keys():
- if user in test:
- tu = test[user]
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- if item in tu:
- hit+= 1
- all += N
- #print(hit)
- #print(all)
- return hit/(all * 1.0)
-
- def Popularity(train, test, W, N, K):
- ''' 计算推荐结果的流行度
- @param train 训练数据集Dict
- @param test 测试数据集Dict
- @param W 记录用户相似度的二维矩阵
- @param N 推荐结果的数目
- @param K 选取近邻的数目
- '''
- item_popularity = dict()
- for user, items in train.items():
- for item in items:
- if item not in item_popularity:
- item_popularity[item] = 0
- item_popularity[item] += 1
-
- ret = 0
- n = 0
- for user in train.keys():
- rank = GetRecommendation(user, train, W, N, K)
- for item, pui in rank:
- ret += math.log(1+ item_popularity[item])
- n += 1
- ret /= n * 1.0
- return ret
-
-
-
- if __name__ == '__main__':
- data = []
- M = 8
- key = 1
- seed = 1
- N = 10
- K = 80
- W = dict()
- rank = dict()
-
- print("this is the main function")
- file = open('./ml-1m/rat.dat')
- ReadData(file,data)
- train,test = SplitData(data, M, key, seed)
- W = UserSimilarity(train)
- recall = Recall(train, test, W, N, K)
- precision = Precision(train, test, W, N, K)
- popularity = Popularity(train, test, W, N, K)
- coverage = Coverage(train, test, W, N, K)
- print 'recall: ',recall,'\n'
- print 'precision: ',precision,'\n'
- print 'Popularity: ',popularity,'\n'
- print 'coverage: ', coverage,'\n'
- else :
- print("this is not the main function")