基于PySpark的推荐系统搭建,简单的demo带大家入门

向量数据库大模型智能应用

01

前言

推荐系统主要用于将合适的内容或产品以个性化方式推荐给合适的用户,以便增强用户体验。在使用海量数据以及学习理解特定用户的喜好方面,推荐系统展现出了强大的实力。推荐有助于用户轻易地检索数百万款产品或海量内容,并且向用户展示他们可能会喜欢或购买的合适的产品。本文主要介绍如何利用PySpark构建一个微型的推荐系统。

02

数据信息

实例使用的数据来自一个非常著名、开源的电影镜头数据集,数据包含10w条记录,其中具有三列(uesr\_id、title、rating)。我们将使用75%的数据用来训练模型,并且在剩余的25%的数据上对模型进行测试评估。

03

创建SparkSession对象


                
from pyspark.sql import SparkSession   
from pyspark.sql.functions import *  
spark=SparkSession.builder.appName('rc').getOrCreate()
            

04

读取数据集


              
 
  `df=spark.read.csv('movie\_ratings\_df.csv',inferSchema=True,header=True)  
print((df.count(),len(df.columns)))  
Output:(100000, 3)  
  
df.printSchema()  
Output:root  
 |-- userId: integer (nullable = true)  
 |-- title: string (nullable = true)  
 |-- rating: integer (nullable = true)`
 
 
            
共有三列,其中两列是数值类型,title是类别类型。使用PySpark构建RS的关键在于,要让user\_id和item\_id处于数值形式。

05

数据分析

查看打分数最多的前10个用户

                  
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)  

              
查看打分数最少的前10个用户

                  
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)  

              
查看被打分次数最多的前十部电影

                  
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)  

              
查看被打分次数最少的前十部电影

                  
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)
              

06

特征工程

使用StringIndexer将电影名称从类别类型转换成数值类型。我们需要从PySpark库中引入StringIndexer和InderToString。

                  
from pyspark.ml.feature import StringIndexer,IndexToStringstringIndexer = StringIndexer(inputCol="title", outputCol="title_new")model = stringIndexer.fit(df)indexed = model.transform(df)
                
 
  `####查看新的DataFrame的10行数据`
  `indexed.show(10)`
 


              

                  
userId|               title|rating|title\_new|  
+------+--------------------+------+---------+  
|   932|    Cape Fear (1991)|     3|    161.0|  
|   721|   Piano, The (1993)|     3|    173.0|  
|   642|Low Down Dirty Sh...|     2|   1115.0|  
|   798|That Darn Cat! (1...|     4|    686.0|  
|   535|African Queen, Th...|     4|    199.0|  
|   765|Stealing Beauty (...|     5|    521.0|  
|   927|Poison Ivy II (1995)|     3|   1041.0|  
|   544|    G.I. Jane (1997)|     3|    152.0|  
|   788|Godfather: Part I...|     4|    108.0|  
|   706|Birdcage, The (1996)|     4|     43.0|  
+------+--------------------+------+---------+  
only showing top 10 rows  

              

07

数据划分

使用dataFrame的randomSplit方法,可以对数据集进行随机划分。

                
train,test=indexed.randomSplit([0.75,0.25])  
train.count(),test.count()  

            

                
75104,24876  

            

08

训练和预测

我们需要从PySpark的ml库中引入ALS函数并且基于训练集构建模型。有多个超参数可以调整以便提升模型性能。其中有两个比较重要的超参数:nonnegative='True'不会在推荐系统中创建负数评分,而coldStartStrategy='drop'可以防止生成任何NaN评分预测。

                
from pyspark.ml.recommendation import ALS  
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")  
rec_model=rec.fit(train)  

            
我们需要使用transformer函数基于测试数据进行预测,并且使用RegressionEvaluation基于测试数据检查模型的RMSE值

              
  `predicted_ratings=rec_model.transform(test)predicted_ratings.printSchema()`
  `root`
 


                
 |-- userId: integer (nullable = true) |-- title: string (nullable = true) |-- rating: integer (nullable = true) |-- title_new: double (nullable = false) |-- prediction: float (nullable = false)
            

              
  `predicted_ratings.orderBy(rand()).show(10)`
 


                
userId|               title|rating|title_new|prediction|+------+--------------------+------+---------+----------+|    92|Tie Me Up! Tie Me...|     4|    766.0| 3.1512196||   222|       Batman (1989)|     3|    116.0|  3.503284||   178|Beauty and the Be...|     4|    114.0| 4.1487904||   303|Jerry Maguire (1996)|     5|     15.0|  4.348913||   134|      Flubber (1997)|     2|    579.0| 2.5635276||   295|      Henry V (1989)|     4|    268.0| 4.2598643||   889|Adventures of Pri...|     2|    305.0| 2.9040515||   374| Men in Black (1997)|     3|     31.0|  3.602631||   559|Killing Fields, T...|     4|    276.0|   4.55797||   290|Star Trek: The Mo...|     1|    286.0| 3.2992659|+------+--------------------+------+---------+----------+only showing top 10 rows
            

09

测试评估


                
from pyspark.ml.evaluation import RegressionEvaluator  
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')  
rmse=evaluator.evaluate(predicted_ratings)  
print(rmse)  
  
1.0293574739493354  

            

010

电影推荐


                
def top\_movies(user\_id,n):  
    """  
    This function returns the top 'n' movies that user has not seen yet but might like   
  
    """  
  
    unique_movies=indexed.select('title\_new').distinct()  
  
    #assigning alias name 'a' to unique movies df  
    a = unique_movies.alias('a')  
  
    #creating another dataframe which contains already watched movie by active user   
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title\_new')  
  
    #assigning alias name 'b' to watched movies df  
    b=watched_movies.alias('b')  
  
    #joining both tables on left join   
    total_movies = a.join(b, a.title_new == b.title_new,how='left')  
  
    #selecting movies which active user is yet to rate or watch  
    remaining_movies=total_movies.where(col("b.title\_new").isNull()).select(a.title_new).distinct()  
  
  
    #adding new column of user\_Id of active useer to remaining movies df   
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))  
  
  
    #making recommendations using ALS recommender model and selecting only top 'n' movies  
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)  
  
  
    #adding columns of movie titles in recommendations  
    movie_title = IndexToString(inputCol="title\_new", outputCol="title",labels=model.labels)  
    final_recommendations=movie_title.transform(recommendations)  
  
    #return the recommendations to active user  
    return final_recommendations.show(n,False)  

            
我们使用user\_id=85,给该用户推荐10部电影。

                
top_movies(85,10)  
  
|title\_new|userId|prediction|title                                 |  
+---------+------+----------+--------------------------------------+  
|1195.0   |85    |4.6757703 |Pather Panchali (1955)                |  
|1265.0   |85    |4.5952506 |World of Apu, The (Apur Sansar) (1959)|  
|1470.0   |85    |4.586827  |Some Mother's Son (1996)              |  
|1295.0   |85    |4.473976  |Of Human Bondage (1934)               |  
|649.0    |85    |4.384286  |When We Were Kings (1996)             |  
|263.0    |85    |4.3773894 |12 Angry Men (1957)                   |  
|996.0    |85    |4.342606  |In the Bleak Midwinter (1995)         |  
|914.0    |85    |4.328393  |Top Hat (1935)                        |  
|107.0    |85    |4.3254786 |Rear Window (1954)                    |  
|1501.0   |85    |4.3179636 |Butcher Boy, The (1998)               |  
+---------+------+----------+--------------------------------------+  

            

011

总结

本文介绍了在Ppark中使用ALS方法创建了基于协同过滤的推荐系统。需要数据的小伙伴可以以下链接自取。

https://pan.baidu.com/s/18nWZyecz0HdJ8Dh80lJgpA

提取码:ws9e

picture.image

分享,点赞,在看, 都在这儿,点我不香吗?

picture.image

扫描下方的二维码进入ChallengeHub粉丝群,也可以添加管理员微信拉您入群。

picture.image

与爱好AI的朋友们共同成长

picture.image

ChallengeHub粉丝群

picture.image

管理员微信

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
字节跳动客户端性能优化最佳实践
在用户日益增长、需求不断迭代的背景下,如何保证 APP 发布的稳定性和用户良好的使用体验?本次分享将结合字节跳动内部应用的实践案例,介绍应用性能优化的更多方向,以及 APM 团队对应用性能监控建设的探索和思考。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论