需求说明:
对热门商品进行统计
根据商品的点击数据,统计出各个区域的销量排行TOPK 产品
输入:开始时间与结束时间
输出:每个城市的销量排行TOP K 产品
地区级别 |
地区名称 |
产品名称 |
点击量 |
产品类型 |
A |
西南片区 |
雾霾口罩 |
1000000 |
第三方 |
A |
西南片区 |
雾霾口罩 |
1000000 |
第三方 |
A |
西南片区 |
雾霾口罩 |
1000000 |
第三方 |
B |
华中地区 |
苹果 |
1000 |
自营 |
B |
华中地区 |
苹果 |
1000 |
自营 |
B |
华中地区 |
苹果 |
1000 |
自营 |
涉及表:
用户行为表(文件日志) city_id , product_id,点击量
地区表(mysql)格式如下:
产品表(mysql)格式如下:
使用 spark core 与spark sql实现
主要技术点: Spark sql操作Hdfs数据与Mysql数据,sql窗口函数的使用,dataFrame的使用
关于spark集成hive,参考:https://blog.csdn.net/hexinghua0126/article/details/80300048
关于spark读取mysql(地区表,产品表)代码如下:
/**
*获取mysql表数据,并注册为spark临时表
*/
def loadMysqlData(): Unit = {
//创建连接mysql连接
val jdbcOptions = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "areas")
val reader = sqlContext.read.format("jdbc")
val jdbcOptions2 = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "product")
val reader2 = sqlContext.read.format("jdbc")
//把查询出来的表注册为临时表
reader.options(jdbcOptions).load().registerTempTable("spark_areas")
reader2.options(jdbcOptions2).load().registerTempTable("spark_product")
}
关于spark统计地区、点击量代码如下:
package com.hxh
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object UserAnalysis {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
val sqlContext = new HiveContext(sparkContext)
def main(args: Array[String]): Unit = {
sqlContext.sql("use bigdata")
sqlContext.sql("select * from t_pages_click ").registerTempTable("tPagesClick")
loadMysqlData()
areaNameCount()
areaRowCount()
sqlContext.sql("select areaLevel,areaName,productName,sumClick,extendName " +
"from click_row_count " +
"where numSum<=3 " +
"order by areaLevel asc,sumClick desc" ).show(50)
}
def areaRowCount(): Unit ={
sqlContext.sql("select " +
"CASE WHEN areaName IN ( '华北地区', '东北地区') THEN 'A' "+
" WHEN areaName IN ( '华东地区', '华中地区') THEN 'B' "+
" WHEN areaName IN ( '华南地区', '西南地区') THEN 'C' "+
"WHEN areaName IN ('西北地区') THEN 'D' "+
"ELSE'数据错误' END as areaLevel,areaName,productName," +
"sumClick," +
"Row_Number() OVER (PARTITION BY areaName order by sumClick DESC) AS numSum," +
"if(extendInfo='1','自营','第三方') extendName "+
"from areaNameCount ").registerTempTable("click_row_count")
}
/**
* 按地区统计点击量
*/
def areaNameCount(): Unit ={
sqlContext.sql("select areas.area_name areaName," +
"product.product_name productName,count(1) sumClick," +
"product.extend_info extendInfo from tPagesClick " +
"join spark_areas areas " +
"on tPagesClick.city_id=areas.city_id " +
"join spark_product product " +
"on product.product_id=tPagesClick.click_product_id " +
"group by areas.area_name,product.product_name,product.extend_info").registerTempTable("areaNameCount")
}
结果运行结果如下: