01
前言
推荐系统主要用于将合适的内容或产品以个性化方式推荐给合适的用户,以便增强用户体验。在使用海量数据以及学习理解特定用户的喜好方面,推荐系统展现出了强大的实力。推荐有助于用户轻易地检索数百万款产品或海量内容,并且向用户展示他们可能会喜欢或购买的合适的产品。本文主要介绍如何利用PySpark构建一个微型的推荐系统。
02
数据信息
实例使用的数据来自一个非常著名、开源的电影镜头数据集,数据包含10w条记录,其中具有三列(uesr\_id、title、rating)。我们将使用75%的数据用来训练模型,并且在剩余的25%的数据上对模型进行测试评估。
03
创建SparkSession对象
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName('rc').getOrCreate()
04
读取数据集
`df=spark.read.csv('movie\_ratings\_df.csv',inferSchema=True,header=True)
print((df.count(),len(df.columns)))
Output:(100000, 3)
df.printSchema()
Output:root
|-- userId: integer (nullable = true)
|-- title: string (nullable = true)
|-- rating: integer (nullable = true)`
共有三列,其中两列是数值类型,title是类别类型。使用PySpark构建RS的关键在于,要让user\_id和item\_id处于数值形式。
05
数据分析
查看打分数最多的前10个用户
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)
查看打分数最少的前10个用户
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)
查看被打分次数最多的前十部电影
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)
查看被打分次数最少的前十部电影
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)
06
特征工程
使用StringIndexer将电影名称从类别类型转换成数值类型。我们需要从PySpark库中引入StringIndexer和InderToString。
from pyspark.ml.feature import StringIndexer,IndexToStringstringIndexer = StringIndexer(inputCol="title", outputCol="title_new")model = stringIndexer.fit(df)indexed = model.transform(df)
`####查看新的DataFrame的10行数据`
`indexed.show(10)`
userId| title|rating|title\_new|
+------+--------------------+------+---------+
| 932| Cape Fear (1991)| 3| 161.0|
| 721| Piano, The (1993)| 3| 173.0|
| 642|Low Down Dirty Sh...| 2| 1115.0|
| 798|That Darn Cat! (1...| 4| 686.0|
| 535|African Queen, Th...| 4| 199.0|
| 765|Stealing Beauty (...| 5| 521.0|
| 927|Poison Ivy II (1995)| 3| 1041.0|
| 544| G.I. Jane (1997)| 3| 152.0|
| 788|Godfather: Part I...| 4| 108.0|
| 706|Birdcage, The (1996)| 4| 43.0|
+------+--------------------+------+---------+
only showing top 10 rows
07
数据划分
使用dataFrame的randomSplit方法,可以对数据集进行随机划分。
train,test=indexed.randomSplit([0.75,0.25])
train.count(),test.count()
75104,24876
08
训练和预测
我们需要从PySpark的ml库中引入ALS函数并且基于训练集构建模型。有多个超参数可以调整以便提升模型性能。其中有两个比较重要的超参数:nonnegative='True'不会在推荐系统中创建负数评分,而coldStartStrategy='drop'可以防止生成任何NaN评分预测。
from pyspark.ml.recommendation import ALS
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")
rec_model=rec.fit(train)
我们需要使用transformer函数基于测试数据进行预测,并且使用RegressionEvaluation基于测试数据检查模型的RMSE值
`predicted_ratings=rec_model.transform(test)predicted_ratings.printSchema()`
`root`
|-- userId: integer (nullable = true) |-- title: string (nullable = true) |-- rating: integer (nullable = true) |-- title_new: double (nullable = false) |-- prediction: float (nullable = false)
`predicted_ratings.orderBy(rand()).show(10)`
userId| title|rating|title_new|prediction|+------+--------------------+------+---------+----------+| 92|Tie Me Up! Tie Me...| 4| 766.0| 3.1512196|| 222| Batman (1989)| 3| 116.0| 3.503284|| 178|Beauty and the Be...| 4| 114.0| 4.1487904|| 303|Jerry Maguire (1996)| 5| 15.0| 4.348913|| 134| Flubber (1997)| 2| 579.0| 2.5635276|| 295| Henry V (1989)| 4| 268.0| 4.2598643|| 889|Adventures of Pri...| 2| 305.0| 2.9040515|| 374| Men in Black (1997)| 3| 31.0| 3.602631|| 559|Killing Fields, T...| 4| 276.0| 4.55797|| 290|Star Trek: The Mo...| 1| 286.0| 3.2992659|+------+--------------------+------+---------+----------+only showing top 10 rows
09
测试评估
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
rmse=evaluator.evaluate(predicted_ratings)
print(rmse)
1.0293574739493354
010
电影推荐
def top\_movies(user\_id,n):
"""
This function returns the top 'n' movies that user has not seen yet but might like
"""
unique_movies=indexed.select('title\_new').distinct()
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')
#creating another dataframe which contains already watched movie by active user
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title\_new')
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')
#joining both tables on left join
total_movies = a.join(b, a.title_new == b.title_new,how='left')
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title\_new").isNull()).select(a.title_new).distinct()
#adding new column of user\_Id of active useer to remaining movies df
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
#adding columns of movie titles in recommendations
movie_title = IndexToString(inputCol="title\_new", outputCol="title",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)
#return the recommendations to active user
return final_recommendations.show(n,False)
我们使用user\_id=85,给该用户推荐10部电影。
top_movies(85,10)
|title\_new|userId|prediction|title |
+---------+------+----------+--------------------------------------+
|1195.0 |85 |4.6757703 |Pather Panchali (1955) |
|1265.0 |85 |4.5952506 |World of Apu, The (Apur Sansar) (1959)|
|1470.0 |85 |4.586827 |Some Mother's Son (1996) |
|1295.0 |85 |4.473976 |Of Human Bondage (1934) |
|649.0 |85 |4.384286 |When We Were Kings (1996) |
|263.0 |85 |4.3773894 |12 Angry Men (1957) |
|996.0 |85 |4.342606 |In the Bleak Midwinter (1995) |
|914.0 |85 |4.328393 |Top Hat (1935) |
|107.0 |85 |4.3254786 |Rear Window (1954) |
|1501.0 |85 |4.3179636 |Butcher Boy, The (1998) |
+---------+------+----------+--------------------------------------+
011
总结
本文介绍了在Ppark中使用ALS方法创建了基于协同过滤的推荐系统。需要数据的小伙伴可以以下链接自取。
https://pan.baidu.com/s/18nWZyecz0HdJ8Dh80lJgpA
提取码:ws9e
分享,点赞,在看, 都在这儿,点我不香吗?
扫描下方的二维码进入ChallengeHub粉丝群,也可以添加管理员微信拉您入群。
与爱好AI的朋友们共同成长
ChallengeHub粉丝群
管理员微信