数据:用户id,itemid,分数
代码:
import org.apache.spark.{SparkConf, SparkContext}
object userwatchlist {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//conf.setMaster("local")
conf.setAppName("userwatchlist test")
val sc = new SparkContext(conf)
val input_path = sc.textFile("/train_new.data")
val output_path = "/userwatchlist_output"
val data = input_path.filter{ x =>
val fields = x.split("\t")
fields(2).toDouble > 2
}.map { x =>
val fields = x.split("\t")
(fields(0).toString, (fields(1).toString, fields(2).toString))
}.groupByKey(10).map { x =>
val userid = x._1
val item_score_tuple_list = x._2
val tmp_arr = item_score_tuple_list.toArray.sortWith(_._2>_._2)
var watchlen = tmp_arr.length
if (watchlen > 5){
watchlen = 5
}
val strbuf = new StringBuilder
for (i <- 0 until watchlen) {
strbuf ++= tmp_arr(i)._1
strbuf.append(":")
strbuf ++= tmp_arr(i)._2
strbuf.append(" ")
}
userid + "\t" + strbuf
}
data.saveAsTextFile(output_path)
}
}
结果:
将相同用户(key)所有item列表放在了一行