#!/usr/bin/env python # coding=utf-8 ''' 运行命令/yourpath/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile movieLensDataDir 电影评分数据集目录 比如 ml-1m/ personalRatingsFile 需要推荐的某用户的评价数据 格式参考ratings.dat ''' import sys import itertools from math import sqrt from operator import add from os.path import join, isfile, dirname from pyspark import SparkConf, SparkContext from pyspark.mllib.recommendation import ALS def parseRating(line): """ Parses a rating record in MovieLens format userId::movieId::rating::timestamp . """ fields = line.strip().split("::") return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2])) def parseMovie(line): """ Parses a movie record in MovieLens format movieId::movieTitle . """ fields = line.strip().split("::") return int(fields[0]), fields[1] def loadRatings(ratingsFile): """ Load ratings from file. """ if not isfile(ratingsFile): print "File %s does not exist." % ratingsFile sys.exit(1) f = open(ratingsFile, 'r') ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f]) f.close() if not ratings: print "No ratings provided." sys.exit(1) else: return ratings def computeRmse(model, data, n): """ Compute RMSE (Root Mean Squared Error). """ predictions = model.predictAll(data.map(lambda x: (x[0], x[1]))) predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])).join(data.map(lambda x: ((x[0], x[1]), x[2]))).values() return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n)) if __name__ == "__main__": if (len(sys.argv) != 3): print "Usage: /path/to/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile" sys.exit(1) # set up environment conf = SparkConf().setAppName("MovieLensALS").set("spark.executor.memory", "1g") sc = SparkContext(conf=conf) # load personal ratings myRatings = loadRatings(sys.argv[2]) myRatingsRDD = sc.parallelize(myRatings, 1) movieLensHomeDir = sys.argv[1] # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating)) ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating) # movies is an RDD of (movieId, movieTitle) movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect()) numRatings = ratings.count() numUsers = ratings.values().map(lambda r: r[0]).distinct().count() numMovies = ratings.values().map(lambda r: r[1]).distinct().count() myRatedMovieIds = set([x[1] for x in myRatings]) print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) # split ratings into train , validation # last digit of the timestamp, add myRatings to train, and cache them # training, validation, test are all RDDs of (userId, movieId, rating) numPartitions = 4 #training = ratings.filter(lambda x: x[0] < 8).values().union(myRatingsRDD).repartition(numPartitions).cache() validation = ratings.filter(lambda x: x[0] >= 8 and x[0] < 10).values().repartition(numPartitions).cache() numTraining = training.count() numValidation = validation.count() print "Training: %d, validation: %d" % (numTraining, numValidation) # train models and evaluate them on the validation set ranks = [10,12] lambdas = [0.01,0.4,1.0] numIters = [10] bestModel = None bestValidationRmse = float("inf") bestRank = 0 bestLambda = -1.0 bestNumIter = -1 for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters): model = ALS.train(training, rank, numIter, lmbda) validationRmse = computeRmse(model, validation, numValidation) print "RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.4f, and numIter = %d." % (rank, lmbda, numIter) if (validationRmse < bestValidationRmse): bestModel = model bestValidationRmse = validationRmse bestRank = rank bestLambda = lmbda bestNumIter = numIter # evaluate the best model on the test set print "The best model was trained with rank = %d and lambda = %.4f, and numIter = %d ,and Rmse %.4f" % (bestRank, bestLambda,bestNumIter,bestValidationRmse) #exit() #通过计算得到rank = 10 lambda = 0.45 numIter = 20 结果最好 bestModel = ALS.train(training, 10, 20, 0.45); # training, validation, test are all RDDs of (userId, movieId, rating) #make personalized recommendations #排除该用户已评价过的电影 testdata = training.filter(lambda x: x[0] not in myRatedMovieIds).map(lambda p: (int(p[0]), int(p[1]))) predictions = bestModel.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) #对预测结果按分值排序 取前5 recommendations = predictions.sortBy(lambda x:x[1],False).take(5) print "Movies recommended for you:" for i in xrange(len(recommendations)): print ("%2d: %s %s" % (i + 1, recommendations[i][0],recommendations[i][1])) # clean up sc.stop()
数据集采用MovieLens
代码参考https://github.com/databricks/spark-training/blob/master/machine-learning/python/solution/MovieLensALS.py