pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>recommender</artifactId>
<groupId>com.kejin</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>StatisticsRecommender</artifactId>
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>
StatisticsRecommender.scala:
package com.kejin.statistics
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )
object StatisticsRecommender {
// 定义mongodb中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
val RATE_MORE_PRODUCTS = "RateMoreProducts"
val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
val AVERAGE_PRODUCTS = "AverageProducts"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[1]",
"mongo.uri" -> "mongodb://192.168.31.52:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
// 加载数据
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating]
.toDF()
// 创建一张叫ratings的临时表
ratingDF.createOrReplaceTempView("ratings")
// TODO: 用spark sql去做不同的统计推荐
// 1. 历史热门商品,按照评分个数统计,productId,count
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")
storeDFInMongoDB( rateMoreProductsDF, RATE_MORE_PRODUCTS )
// 2. 近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth
// 创建一个日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
// 注册UDF,将timestamp转化为年月格式yyyyMM
spark.udf.register("changeDate", (x: Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt)
// 把原始rating数据转换成想要的结构productId, score, yearmonth
val ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")
// 把df保存到mongodb
storeDFInMongoDB( rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS )
// 3. 优质商品统计,商品的平均评分,productId,avg
val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
storeDFInMongoDB( averageProductsDF, AVERAGE_PRODUCTS )
spark.stop()
}
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit ={
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
}