标题起的挺有意思,但是最近的心情挺没意思,有人的地方就有江湖,有时候人与人的相处真是复杂,实在觉得没必要那么复杂,开心了笑,不开心了哭,像小孩子一样不好嘛!唠叨的话不说了,干正事!!!
学习spark是希望数据量大起来的时候,能够用spark跑起来,当初的一个瓶颈就是在协同过滤(cf),所以学习spark的时候先从简单数据的协同过滤开始吧!pyspark能跑通得感谢那个素未谋面只是咚咚上联系的姚广东同学,好羡慕他精通spark啊!一定是个很牛的人,还很有耐心!给同事点赞!!
读取hdfs中的文件,文件内容很简单:
clickdata.txt(用空格分开)
userinfo.txt(用空格分开)
数据一目了然:
通过用户甲、乙、丙对商品a,b,c的评分,来为丁推荐商品
算法:
1.2.2 余弦相似度
原理:多维空间两点与所设定的点形成夹角的余弦值。
范围:[-1,1],值越大,说明夹角越大,两点相距就越远,相似度就越小。
余弦相似度模型:根据用户评分数据表,生成物品的相似矩阵;
输入参数:user_rdd:用户评分表;
输出参数:余弦相似矩阵:物品1,物品2,相似度值;
余弦相似度矩阵模型:
defCosineSimilarity (
user_rdd:RDD[(String,String,Double)]
) : (RDD[(String,String,Double)]) = {
// 0 数据做准备
valuser_rdd2=user_rdd.map(f => (f._1,(f._2,f._3))).sortByKey()
user_rdd2.cache
// 1 (用户,物品,评分)笛卡尔积 (用户,物品,评分) =>(物品1,物品2,评分1,评分2)组合
valuser_rdd3=user_rdd2joinuser_rdd2
valuser_rdd4=user_rdd3.map(f=> ((f._2._1._1, f._2._2._1),(f._2._1._2, f._2._2._2)))
// 2 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,评分1*评分2)组合并累加
valuser_rdd5=user_rdd4.map(f=> (f._1,f._2._1*f._2._2 )).reduceByKey(_+_)
// 3 对角矩阵
valuser_rdd6=user_rdd5.filter(f=> f._1._1 == f._1._2)
// 4 非对角矩阵
valuser_rdd7=user_rdd5.filter(f=> f._1._1 != f._1._2)
// 5 计算相似度
valuser_rdd8=user_rdd7.map(f=> (f._1._1, (f._1._1, f._1._2, f._2))).
join(user_rdd6.map(f=> (f._1._1, f._2)))
valuser_rdd9=user_rdd8.map(f=> (f._2._1._2, (f._2._1._1,
f._2._1._2, f._2._1._3, f._2._2)))
valuser_rdd10=user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd11 = user_rdd10.map(f => (f._2._1._1,f._2._1._2,f._2._1._3,f._2._1._4,f._2._2))
valuser_rdd12=user_rdd11.map(f=> (f._1, f._2, (f._3 / sqrt(f._4 * f._5)) ))
// 7 结果返回
user_rdd12
}
基本上就是基于这个余弦相似度计算基于item的协同过滤:
#!/usr/bin/env python # -*- coding: utf-8 -*- #引入一些必须的包 import sys from pyspark import SparkContext,SparkConf from operator import add import re import math #SparkContext初始化是Driver应用程序提交执行的前提,这里以local[*]模式初始化 #大框架我还不懂,但是我知道必须有的 sc = SparkContext(appName= "cf",master= "local[*]") #根据hdfs目录读取用户的打分数据 lines = sc.textFile("hdfs://ns1/user/dmpuser/liuhua/clickdata.txt") #根据段落构建RDD,RDD这东西吧感觉就像一个个模块似的,可以每个块单独处理 clickRDD = lines.flatMap(lambda x: x.split("\n")) #.collect()和.frist()等还有其他,只有运行这个的时候前面的构建模块才运行, # 有几篇文章不错https://cloud.tencent.com/developer/article/1092146 https://blog.csdn.net/cymy001/article/details/78483723 print clickRDD.collect() #定义了一个对RDD操作的函数,即对每个块上数据操作的函数 def keyvalue(x): items = x.split(" ") key = items[0] value = (items[1],items[2]) return (key,value) #用map这种方式将这个函数作用到每个块 kvRDD = clickRDD.map(keyvalue) #运行结果如下.collect(): [(u'jia', (u'a', u'0.3')), (u'jia', (u'b', u'0.4')), (u'jia', (u'c', u'0.1')), (u'yi', (u'a', u'0.6')), (u'yi', (u'b', u'0.5')), (u'yi', (u'c', u'0.9')), (u'bing', (u'a', u'0.7')), (u'bing', (u'b', u'0.2')), (u'bing', (u'c', u'0.8'))] print kvRDD.collect() print kvRDD.first() #用key连接做join,把这些块想象成表就自然理解啦 joinRDD = kvRDD.join(kvRDD) #运行结果如下.collect(): [(u'yi', ((u'a', u'0.6'), (u'a', u'0.6'))), (u'yi', ((u'a', u'0.6'), (u'b', u'0.5'))), (u'yi', ((u'a', u'0.6'), (u'c', u'0.9'))), (u'yi', ((u'b', u'0.5'), (u'a', u'0.6'))), (u'yi', ((u'b', u'0.5'), (u'b', u'0.5'))), (u'yi', ((u'b', u'0.5'), (u'c', u'0.9'))), (u'yi', ((u'c', u'0.9'), (u'a', u'0.6'))), (u'yi', ((u'c', u'0.9'), (u'b', u'0.5'))), (u'yi', ((u'c', u'0.9'), (u'c', u'0.9'))), (u'bing', ((u'a', u'0.7'), (u'a', u'0.7'))), (u'bing', ((u'a', u'0.7'), (u'b', u'0.2'))), (u'bing', ((u'a', u'0.7'), (u'c', u'0.8'))), (u'bing', ((u'b', u'0.2'), (u'a', u'0.7'))), (u'bing', ((u'b', u'0.2'), (u'b', u'0.2'))), (u'bing', ((u'b', u'0.2'), (u'c', u'0.8'))), (u'bing', ((u'c', u'0.8'), (u'a', u'0.7'))), (u'bing', ((u'c', u'0.8'), (u'b', u'0.2'))), (u'bing', ((u'c', u'0.8'), (u'c', u'0.8'))), (u'jia', ((u'a', u'0.3'), (u'a', u'0.3'))), (u'jia', ((u'a', u'0.3'), (u'b', u'0.4'))), (u'jia', ((u'a', u'0.3'), (u'c', u'0.1'))), (u'jia', ((u'b', u'0.4'), (u'a', u'0.3'))), (u'jia', ((u'b', u'0.4'), (u'b', u'0.4'))), (u'jia', ((u'b', u'0.4'), (u'c', u'0.1'))), (u'jia', ((u'c', u'0.1'), (u'a', u'0.3'))), (u'jia', ((u'c', u'0.1'), (u'b', u'0.4'))), (u'jia', ((u'c', u'0.1'), (u'c', u'0.1')))] #这里面的转化主要是为了计算出协同过滤的余弦相似度算法需要的数值https://blog.csdn.net/sunbow0/article/details/42737541这篇文章值得一看 def iiscore(x): key = (x[0][0],x[1][0]) value = (x[0][1],x[1][1]) return (key,value) #去掉用户,做了join之后,同一用户的打分对象以及数值都结合到一起了,所以丢掉用户这维 valueRDD = joinRDD.values() #运行结果如下.collect(): [((u'a', u'0.6'), (u'a', u'0.6')), ((u'a', u'0.6'), (u'b', u'0.5')), ((u'a', u'0.6'), (u'c', u'0.9')), ((u'b', u'0.5'), (u'a', u'0.6')), ((u'b', u'0.5'), (u'b', u'0.5')), ((u'b', u'0.5'), (u'c', u'0.9')), ((u'c', u'0.9'), (u'a', u'0.6')), ((u'c', u'0.9'), (u'b', u'0.5')), ((u'c', u'0.9'), (u'c', u'0.9')), ((u'a', u'0.7'), (u'a', u'0.7')), ((u'a', u'0.7'), (u'b', u'0.2')), ((u'a', u'0.7'), (u'c', u'0.8')), ((u'b', u'0.2'), (u'a', u'0.7')), ((u'b', u'0.2'), (u'b', u'0.2')), ((u'b', u'0.2'), (u'c', u'0.8')), ((u'c', u'0.8'), (u'a', u'0.7')), ((u'c', u'0.8'), (u'b', u'0.2')), ((u'c', u'0.8'), (u'c', u'0.8')), ((u'a', u'0.3'), (u'a', u'0.3')), ((u'a', u'0.3'), (u'b', u'0.4')), ((u'a', u'0.3'), (u'c', u'0.1')), ((u'b', u'0.4'), (u'a', u'0.3')), ((u'b', u'0.4'), (u'b', u'0.4')), ((u'b', u'0.4'), (u'c', u'0.1')), ((u'c', u'0.1'), (u'a', u'0.3')), ((u'c', u'0.1'), (u'b', u'0.4')), ((u'c', u'0.1'), (u'c', u'0.1'))] #重新修正key的值,将分数组合作为value kvvRDD = valueRDD.map(iiscore) #运行结果如下.collect(): [((u'a', u'a'), (u'0.6', u'0.6')), ((u'a', u'b'), (u'0.6', u'0.5')), ((u'a', u'c'), (u'0.6', u'0.9')), ((u'b', u'a'), (u'0.5', u'0.6')), ((u'b', u'b'), (u'0.5', u'0.5')), ((u'b', u'c'), (u'0.5', u'0.9')), ((u'c', u'a'), (u'0.9', u'0.6')), ((u'c', u'b'), (u'0.9', u'0.5')), ((u'c', u'c'), (u'0.9', u'0.9')), ((u'a', u'a'), (u'0.7', u'0.7')), ((u'a', u'b'), (u'0.7', u'0.2')), ((u'a', u'c'), (u'0.7', u'0.8')), ((u'b', u'a'), (u'0.2', u'0.7')), ((u'b', u'b'), (u'0.2', u'0.2')), ((u'b', u'c'), (u'0.2', u'0.8')), ((u'c', u'a'), (u'0.8', u'0.7')), ((u'c', u'b'), (u'0.8', u'0.2')), ((u'c', u'c'), (u'0.8', u'0.8')), ((u'a', u'a'), (u'0.3', u'0.3')), ((u'a', u'b'), (u'0.3', u'0.4')), ((u'a', u'c'), (u'0.3', u'0.1')), ((u'b', u'a'), (u'0.4', u'0.3')), ((u'b', u'b'), (u'0.4', u'0.4')), ((u'b', u'c'), (u'0.4', u'0.1')), ((u'c', u'a'), (u'0.1', u'0.3')), ((u'c', u'b'), (u'0.1', u'0.4')), ((u'c', u'c'), (u'0.1', u'0.1'))] print kvvRDD.collect() #针对同一用户对不同的商品的打分分值相乘(不懂,看余弦公式) def calcscore(x): value = float(x[0])*float(x[1]) return value kvsRDD = kvvRDD.mapValues(calcscore) #运行结果如下.collect(): [((u'a', u'a'), 0.36), ((u'a', u'b'), 0.3), ((u'a', u'c'), 0.54), ((u'b', u'a'), 0.3), ((u'b', u'b'), 0.25), ((u'b', u'c'), 0.45), ((u'c', u'a'), 0.54), ((u'c', u'b'), 0.45), ((u'c', u'c'), 0.81), ((u'a', u'a'), 0.48999999999999994), ((u'a', u'b'), 0.13999999999999999), ((u'a', u'c'), 0.5599999999999999), ((u'b', u'a'), 0.13999999999999999), ((u'b', u'b'), 0.04000000000000001), ((u'b', u'c'), 0.16000000000000003), ((u'c', u'a'), 0.5599999999999999), ((u'c', u'b'), 0.16000000000000003), ((u'c', u'c'), 0.6400000000000001), ((u'a', u'a'), 0.09), ((u'a', u'b'), 0.12), ((u'a', u'c'), 0.03), ((u'b', u'a'), 0.12), ((u'b', u'b'), 0.16000000000000003), ((u'b', u'c'), 0.04000000000000001), ((u'c', u'a'), 0.03), ((u'c', u'b'), 0.04000000000000001), ((u'c', u'c'), 0.010000000000000002)] #运行结果如下.collect(): #对两个商品打分的不同用户的分值乘积进行加和(即余弦计算的分子,以及分母) kvpRDD = kvsRDD.reduceByKey(add) [((u'c', u'a'), 1.1300000000000001), ((u'b', u'b'), 0.45000000000000007), ((u'a', u'c'), 1.1300000000000001), ((u'b', u'a'), 0.5599999999999999), ((u'c', u'b'), 0.6500000000000001), ((u'a', u'a'), 0.9399999999999998), ((u'c', u'c'), 1.4600000000000002), ((u'b', u'c'), 0.6500000000000001), ((u'a', u'b'), 0.5599999999999999)] #运行结果如下.collect(): #过滤取得相同商品的分值,得到分母 sameRDD = kvpRDD.filter(lambda x:x[0][0] == x[0][1]) print sameRDD.collect() #运行结果如下.collect(): [((u'b', u'b'), 0.45000000000000007), ((u'a', u'a'), 0.9399999999999998), ((u'c', u'c'), 1.4600000000000002)] #过滤取得不同商品的分值,即分子 diffRDD = kvpRDD.filter(lambda x:x[0][0] != x[0][1]) print diffRDD.collect() #运行结果如下.collect(): [((u'c', u'a'), 1.1300000000000001), ((u'a', u'c'), 1.1300000000000001), ((u'b', u'a'), 0.5599999999999999), ((u'c', u'b'), 0.6500000000000001), ((u'b', u'c'), 0.6500000000000001), ((u'a', u'b'), 0.5599999999999999)] #对相同商品的key值进行调整,改为一个商品 def refreshsame(x): key = x[0][0] value = x[1] return (key,value) srkRDD = sameRDD.map(refreshsame) #运行结果如下.collect(): [(u'b', 0.45000000000000007), (u'a', 0.9399999999999998), (u'c', 1.4600000000000002)] #对不同商品的key值也进行调整,改为一个商品 def refreshdiff(x): key = x[0][0] value = (x[0][0],x[0][1],x[1]) return (key,value) drkRDD = diffRDD.map(refreshdiff) #运行结果如下.collect(): [(u'c', (u'c', u'a', 1.1300000000000001)), (u'a', (u'a', u'c', 1.1300000000000001)), (u'b', (u'b', u'a', 0.5599999999999999)), (u'c', (u'c', u'b', 0.6500000000000001)), (u'b', (u'b', u'c', 0.6500000000000001)), (u'a', (u'a', u'b', 0.5599999999999999))] #将上面得到key值为一个商品的两个RDD进行join,将交叉乘积、单一方商品平方聚集在一起 dsRDD = drkRDD.join(srkRDD) print dsRDD.collect() #运行结果如下.collect(): [(u'a', ((u'a', u'c', 1.1300000000000001), 0.9399999999999998)), (u'a', ((u'a', u'b', 0.5599999999999999), 0.9399999999999998)), (u'c', ((u'c', u'a', 1.1300000000000001), 1.4600000000000002)), (u'c', ((u'c', u'b', 0.6500000000000001), 1.4600000000000002)), (u'b', ((u'b', u'a', 0.5599999999999999), 0.45000000000000007)), (u'b', ((u'b', u'c', 0.6500000000000001), 0.45000000000000007))] #改变key,值将另外一方商品id作为key值 def tran(x): key = x[1][0][1] value = (x[1][0][0],x[1][0][1],x[1][0][2],x[1][1]) return (key,value) tranRDD = dsRDD.map(tran) #运行结果如下.collect(): [(u'c', (u'a', u'c', 1.1300000000000001, 0.9399999999999998)), (u'b', (u'a', u'b', 0.5599999999999999, 0.9399999999999998)), (u'a', (u'c', u'a', 1.1300000000000001, 1.4600000000000002)), (u'b', (u'c', u'b', 0.6500000000000001, 1.4600000000000002)), (u'a', (u'b', u'a', 0.5599999999999999, 0.45000000000000007)), (u'c', (u'b', u'c', 0.6500000000000001, 0.45000000000000007))] #再与同一商品的平方进行join,这样将交叉值以及两个商品的平方均放入到一个RDD中 tenRDD = tranRDD.join(srkRDD) #运行结果如下.collect(): [(u'c', ((u'a', u'c', 1.1300000000000001, 0.9399999999999998), 1.4600000000000002)), (u'c', ((u'b', u'c', 0.6500000000000001, 0.45000000000000007), 1.4600000000000002)), (u'b', ((u'a', u'b', 0.5599999999999999, 0.9399999999999998), 0.45000000000000007)), (u'b', ((u'c', u'b', 0.6500000000000001, 1.4600000000000002), 0.45000000000000007)), (u'a', ((u'c', u'a', 1.1300000000000001, 1.4600000000000002), 0.9399999999999998)), (u'a', ((u'b', u'a', 0.5599999999999999, 0.45000000000000007), 0.9399999999999998))] #将多层key-value的结构拉平,得到计算需要的所有值 def tran11(x): value = (x[1][0][0],x[1][0][1],x[1][0][2],x[1][0][3],x[1][1]) return value elevenRDD = tenRDD.map(tran11) #运行结果如下.collect(): [(u'a', u'c', 1.1300000000000001, 0.9399999999999998, 1.4600000000000002), (u'b', u'c', 0.6500000000000001, 0.45000000000000007, 1.4600000000000002), (u'a', u'b', 0.5599999999999999, 0.9399999999999998, 0.45000000000000007), (u'c', u'b', 0.6500000000000001, 1.4600000000000002, 0.45000000000000007), (u'c', u'a', 1.1300000000000001, 1.4600000000000002, 0.9399999999999998), (u'b', u'a', 0.5599999999999999, 0.45000000000000007, 0.9399999999999998)] #计算两个商品的余弦相似度 def tran12(x): key1 = x[0] key2 = x[1] value = x[2]/math.sqrt(x[3]*x[4]) return (key1,key2,value) tweRDD = elevenRDD.map(tran12) #运行结果如下.collect(): [(u'a', u'c', 0.9645796356477089), (u'b', u'c', 0.8019193110474058), (u'a', u'b', 0.8610291274936259), (u'c', u'b', 0.8019193110474058), (u'c', u'a', 0.9645796356477089), (u'b', u'a', 0.8610291274936259)] #读取用户的打分数据 userinfo = sc.textFile("hdfs://ns1/user/dmpuser/liuhua/userinfo.txt") #分行切块 user_perf = userinfo.flatMap(lambda x: x.split("\n")) #将商品作为key,为后续join做准备 def userdeal(x): infos = x.strip().split(" ") if len(infos) >= 3: key = infos[1] value = (infos[0],infos[2]) return (key,value) userRDD = user_perf.map(userdeal).sortByKey() #运行结果如下.collect(): [(u'a', (u'ding', u'0.5')), (u'c', (u'ding', u'0.2'))] #将相似度RDD的商品提出来作为key def simtran(x): key = x[1] value = (x[0],str(x[2])) return (key,value) isRDD_c = tweRDD.map(simtran) #运行结果如下.collect(): [(u'c', (u'a', 0.9645796356477089)), (u'c', (u'b', 0.8019193110474058)), (u'b', (u'a', 0.8610291274936259)), (u'b', (u'c', 0.8019193110474058)), (u'a', (u'c', 0.9645796356477089)), (u'a', (u'b', 0.8610291274936259))] #关联用户的打分RDD与物品相似度的RDD isjoinRDD_c = isRDD_c.join(userRDD) #运行结果如下.collect(): [(u'a', ((u'c', '0.964579635648'), (u'ding', u'0.5'))), (u'a', ((u'b', '0.861029127494'), (u'ding', u'0.5'))), (u'c', ((u'a', '0.964579635648'), (u'ding', u'0.2'))), (u'c', ((u'b', '0.801919311047'), (u'ding', u'0.2')))] #将用户与新的商品提取出来做为key,去掉关联的key def plustran(x): key = (x[1][1][0],x[1][0][0]) value = float(x[1][1][1]) * float(x[1][0][1]) return (key,value) pRDD_c = isjoinRDD_c.map(plustran).reduceByKey(add) #运行结果如下.collect(): [((u'ding', u'b'), 0.5908984259564), ((u'ding', u'a'), 0.1929159271296), ((u'ding', u'c'), 0.482289817824)] def keytran(x): key = x[0][0] value = (x[0][1],x[1]) return (key,value) pkRDD_c = pRDD_c.map(keytran) #运行结果如下.collect(): [(u'ding', (u'b', 0.5908984259564)), (u'ding', (u'a', 0.1929159271296)), (u'ding', (u'c', 0.482289817824))] #筛选出用户丁 pksRDD_c = pkRDD_c.filter(lambda x:x[0] == 'ding').values() #对分值进行倒序排序,得到推荐结果列表 pksRDD_c = pksRDD_c.sortBy(lambda x:x[1],False) #运行结果如下.collect(): [(u'b', 0.5908984259564), (u'c', 0.482289817824), (u'a', 0.1929159271296)]