Spark MLlib 提供了一种协同过滤算法,可用于训练矩阵分解模型,该模型预测用户对物品的显式或隐式评分,从而进行推荐。本文将深入探讨 Spark 的协同过滤算法。
1 矩阵分解算法
1.1 协同过滤问题的矩阵分解
矩阵分解是推荐任务中常用的一种技术。基本上,矩阵分解算法试图找到表示用户和物品内在属性的潜在因子,即:
$$\hat r_{u,i} = q_{i}^{T}p_{u}$$
其中,$\hat r_{u,i}$ 是用户 $u$ 对物品 $i$ 的预测评分,$q_{i}^{T}$ 和 $p_{u}$ 分别是物品和用户的潜在因子。矩阵分解问题的挑战在于找到 $q_{i}^{T}$ 和 $p_{u}$,这可以通过矩阵分解方法实现。为了尽量接近观察到的评分,开发了一种学习方法。此外,为了避免过拟合问题,学习过程被正则化。例如,基本形式的矩阵分解算法表示如下:
$$\min \sum (r_{u,i} - q_{i}^{T}p_{u})^2 + \lambda (||q_{i}||^2 + ||p_{u}||^2)$$
其中,$\lambda$ 是正则化参数。
在没有显式评分的情况下,可以使用用户与物品的历史交互(如点击、浏览、购买等)来推断隐式评分。为了考虑这些隐式评分,原始矩阵分解算法可以表示为:
$$\min \sum c_{u,i}(p_{u,i} - q_{i}^{T}p_{u})^2 + \lambda (||q_{i}||^2 + ||p_{u}||^2)$$
其中,$c_{u,i}=1+\alpha r_{u,i}$,如果 $r_{u,i}>0$ 则 $p_{u,i}=1$,否则 $p_{u,i}=0$。$r_{u,i}$ 是用户偏好的数值表示(例如点击次数等)。
1.2 交替最小二乘法(ALS)
由于 $q_{i}^{T}p_{u}$ 项的存在,损失函数是非凸的。可以应用梯度下降法,但这将导致昂贵的计算成本。为此,开发了一种交替最小二乘法(ALS)算法来解决这个问题。
ALS 的基本思想是每次学习 $q$ 和 $p$ 中的一个进行优化,而将另一个保持不变。这使得每次迭代的目标函数都是凸的且可解的。当交替迭代收敛到最优解时,过程停止。值得注意的是,这种迭代计算可以并行化和/或分布式处理,这使得该算法在数据集很大且用户-物品评分矩阵非常稀疏的推荐场景中非常理想。关于 ALS 及其分布式计算的全面讨论,请参见这里。
2 Spark Mllib 实现
矩阵分解算法在 Spark ml
DataFrame 和 Spark mllib
RDD 中均可用作为 ALS
模块提供。
- ALS 实现的独特之处在于它使用“交替最小二乘法”来分布式地训练矩阵分解模型。
- 在训练方法中,可以选择一些参数来控制模型性能。
- Spark ALS 模型支持显式和隐式评分。
3 基于 Spark ALS 的 MovieLens 推荐系统
以下代码使用 MovieLens-100K 数据集来演示 Spark 中的 ALS 算法。
注意:此笔记本需要一个 PySpark 环境才能正常运行。请按照 SETUP.md 中的步骤安装 PySpark 环境。
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import sys
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType
from recommenders.datasets import movielens
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkRatingEvaluation
from recommenders.tuning.parameter_sweep import generate_param_grid
from recommenders.datasets.spark_splitters import spark_random_split
print(f"System version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"PySpark version: {pyspark.__version__}")
3.1 加载和准备数据
数据从 CSV 文件读取到 Spark DataFrame 中。
MOVIELENS_DATA_SIZE = "100k"
COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "Timestamp"
schema = StructType(
(
StructField(COL_USER, IntegerType()),
StructField(COL_ITEM, IntegerType()),
StructField(COL_RATING, FloatType()),
StructField(COL_TIMESTAMP, LongType()),
)
)
dfs = movielens.load_spark_df(spark=spark, size=MOVIELENS_DATA_SIZE, schema=schema)
dfs.show(5)
数据按 80-20 比例随机分为训练集和测试集。
dfs_train, dfs_test = spark_random_split(dfs, ratio=0.75, seed=42)
3.2 训练 MovieLens 模型
值得注意的是,Spark ALS 模型允许删除冷用户,以便对测试数据进行稳健评估。如果存在冷用户,Spark ALS 实现允许用户删除冷用户,以确保对预测结果的评估是合理的。
RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05
als = ALS(
maxIter=MAX_ITER,
rank=RANK,
regParam=REG_PARAM,
userCol=COL_USER,
itemCol=COL_ITEM,
ratingCol=COL_RATING,
coldStartStrategy="drop"
)
model = als.fit(dfs_train)
3.3 使用模型进行预测
训练好的模型可以用来预测给定测试数据的评分。
dfs_pred = model.transform(dfs_test).drop(COL_RATING)
使用预测结果,可以评估模型性能。
evaluations = SparkRatingEvaluation(
dfs_test,
dfs_pred,
col_user=COL_USER,
col_item=COL_ITEM,
col_rating=COL_RATING,
col_prediction=COL_PREDICTION
)
print(
"RMSE score = {}".format(evaluations.rmse()),
"MAE score = {}".format(evaluations.mae()),
"R2 score = {}".format(evaluations.rsquared()),
"Explained variance score = {}".format(evaluations.exp_var()),
sep="\n"
)
通常,数据科学家对排名指标也感兴趣。通常,排名指标适用于推荐一组物品的场景。在我们的例子中,推荐的物品应该与用户已经评分的物品不同。
users = dfs_train.select(COL_USER).distinct()
items = dfs_train.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
dfs_train.alias("train"),
(dfs_pred[COL_USER] == dfs_train[COL_USER]) & (dfs_pred[COL_ITEM] == dfs_train[COL_ITEM]),
how='outer'
)
dfs_pred_final = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
.select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")
evaluations = SparkRankingEvaluation(
dfs_test,
dfs_pred_final,
col_user=COL_USER,
col_item=COL_ITEM,
col_rating=COL_RATING,
col_prediction=COL_PREDICTION,
k=K
)
print(
"Precision@k = {}".format(evaluations.precision_at_k()),
"Recall@k = {}".format(evaluations.recall_at_k()),
"NDCG@k = {}".format(evaluations.ndcg_at_k()),
"Mean average precision = {}".format(evaluations.map_at_k()),
sep="\n"
)
3.4 微调模型
Spark ALS 模型的预测性能通常受参数影响。
参数 | 描述 | 默认值 | 备注 |
---|---|---|---|
rank | 潜在因子的数量 | 10 | 潜在因子越多,分解模型中考虑的内在因素越多。 |
regParam | 正则化参数 | 1.0 | 值需要根据经验选择,以避免过拟合。 |
maxIter | 最大迭代次数 | 10 | 迭代次数越多,模型收敛到最优点的效果越好。 |
通常的做法是从默认参数值开始构建模型,然后在一定范围内调整参数,以找到最佳参数组合。以下参数集用于训练 ALS 模型进行比较研究。
param_dict = {
"rank": [10, 15, 20],
"regParam": [0.001, 0.1, 1.0]
}
param_grid = generate_param_grid(param_dict)
rmse_score = []
for g in param_grid:
als = ALS(
userCol=COL_USER,
itemCol=COL_ITEM,
ratingCol=COL_RATING,
coldStartStrategy="drop",
**g
)
model = als.fit(dfs_train)
dfs_pred = model.transform(dfs_test).drop(COL_RATING)
evaluations = SparkRatingEvaluation(
dfs_test,
dfs_pred,
col_user=COL_USER,
col_item=COL_ITEM,
col_rating=COL_RATING,
col_prediction=COL_PREDICTION
)
rmse_score.append(evaluations.rmse())
rmse_score = [float(
rmse_score_array = np.reshape(rmse_score, (len(param_dict["rank"]), len(param_dict["regParam"])))
rmse_df = pd.DataFrame(data=rmse_score_array, index=pd.Index(param_dict["rank"], name="rank"),
columns=pd.Index(param_dict["regParam"], name="reg. parameter"))
fig, ax = plt.subplots()
sns.heatmap(rmse_df, cbar=False, annot=True, fmt=".4g")
从这个可视化中可以看出,随着潜在因子数量的增加,RMSE 先减少后增加,这是由于过拟合。当潜在因子数量为 20,正则化参数为 0.1 时,模型达到了最低的 RMSE 分数。
3.5 Top K 推荐
3.5.1 为所有用户(物品)生成前 K 个推荐
dfs_rec = model.recommendForAllUsers(10)
dfs_rec.show(10)
3.5.2 为选定的用户(物品)生成前 K 个推荐
users = dfs_train.select(als.getUserCol()).distinct().limit(3)
dfs_rec_subset = model.recommendForUserSubset(users, 10)
dfs_rec_subset.show(10)
3.5.3 Top-K 推荐的运行时考虑
值得注意的是,通常对所有用户进行 Top-K 推荐计算是整个管道(模型训练和评分)的瓶颈。这是因为:
- 从所有用户-物品对中获取 Top-K 需要交叉连接,这通常计算量很大。
- 用户-物品对的内积是逐个计算的,而不是利用某些现代计算加速库(如 BLAS)中可用的矩阵块乘法特性。
关于 Spark 中 Top-K 推荐的可能优化的更多详细信息,请参见这里。
# cleanup spark instance
spark.stop()
参考文献
- Yehuda Koren, Robert Bell, and Chris Volinsky, "Matrix Factorization Techniques for Recommender Systems", ACM Computer, Vol. 42, Issue 8, pp 30-37, Aug., 2009.
- Yifan Hu, Yehuda Koren, and Chris Volinsky, "Collaborative Filtering for Implicit Feedback Datasets", Proc. IEEE ICDM, 2008, Dec, Pisa, Italy.
- Apache Spark. url: https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
- Seaborn. url: https://seaborn.pydata.org/
- Scaling collaborative filtering with PySpark. url: https://engineeringblog.yelp.com/2018/05/scaling-collaborative-filtering-with-pyspark.html
- Matrix Completion via Alternating Least Square (ALS). url: http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf