背景
本次将使用某网站的消费和访问记录来进行相关用户行为的挖掘练习,相关测试数据文件已经上传csdn,下载地址:https://download.csdn.net/download/u013560925/10342224
a.数据格式
本次使用数据分为json和parquet两种,parquet作为列式存储,在存储空间和运行效率上都非常有优势,很适合用在工业生产中,详情见另外一片博文:https://blog.csdn.net/u013560925/article/details/79516741(csv、parquet、orc读写性能和方式),数据分为两部分:user.json和userLog.json。分别为用户信息和用户行为信息,详情如下:
user.json
userID:String,name:String,registeredTime:String 用户ID/用户姓名/注册实践
log.json
logID: Long, userID: Long, time: String, typed: Long, consumed: Double 日志ID/用户ID/时间戳/行为类型/消费金额 用户行为:1=购买消费 0=浏览行为
此外logparquet.parquet和userparquet.parquet数据格式和上述一样
b.本次所涉及的挖掘问题如下:
1. 统计特定时间段访问次数最多的Top5
2. 统计特定时间段购买次数最多的Top5
3. 统计特定某个周访问增加最多用户
4. 统计特定某个周消费增加最多用户
5. 统计注册之后前两周内访问最多的前10个人
6. 统计注册之后前两周内购买总额最多的前10个人
c.版本说明
在此说明spark和sql相关版本信息如下,如果版本不一致,api的使用方式会有所出入:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> <scope>compile</scope> </dependency>
正文
0.数据读取和sparksession初始化
初始化:
val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() //注意要单独引入以下引用包 import spark.implicits._ //隐式转换 import org.apache.spark.sql.functions._ //agg内置算法
数据读取:
val userInfo = spark.read.format("parquet").parquet("/user/wangqi/userparquet.parquet") val userLog = spark.read.format("parquet").parquet("/user/wangqi/logparquet.parquet")
1. 统计特定时间段访问次数最多的Top5
重点在于filter的过滤方法,此外使用alias来重命名一个新生成的列
val startTime = "2016-10-01" val endTime = "2016-11-01" userLog.filter("time>='"+startTime+"' and time <= '"+endTime+"' and typed = 0").join(userInfo,userLog("userID")===userInfo("userID")) .groupBy(userInfo("name"),userInfo("userID")).agg(count(userLog("userID")).alias("userLogCount")).sort(col("userLogCount").desc).limit(5).show()
2. 统计特定时间段购买次数最多的Top5
统计特定时间段购买次数最多的Top5: 例如2016-10-01 ~ 2016-11-01
userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "' and typed = 1") .join(userInfo, userInfo("userID") === userLog("userID")) .groupBy(userInfo("userID"),userInfo("name")) .agg(round(sum(userLog("consumed")), 2).alias("totalConsumed")) .sort(col("totalConsumed").desc) //使用col 来选取新生成的某一列,降序排列 .limit(5) .show
3. 统计特定某个周访问增加最多用户
val userLogDS = userLog.as[UserLog].filter("time >= '2016-10-08' and time <= '2016-10-14' and typed = '0'") .map(log => LogOnce(log.logID, log.userID, 1) ) .union(userLog.as[UserLog].filter("time >= '2016-10-01' and time <= '2016-10-07' and typed = '0'") .map(log => LogOnce(log.logID, log.userID, -1) )) userLogDS.join(userInfo.as[UserInfo], userLogDS("userID") === userInfo("userID")) .groupBy(userInfo("userID"),userInfo("name")) .agg(sum(userLogDS("count")).alias("viewCountIncreased")) .sort(col("viewCountIncreased").desc) //使用col 来选取新生成的某一列,降序排列 .limit(5) .show()
4. 统计特定某个周消费增加最多用户
val userLogDs = userLog.as[UserLog].filter("time >= '2016-10-08' and time <= '2016-10-14' and typed = '1'") .map(log=>ConsumedOnce(log.logID,log.userID,log.consumed)) .union(userLog.as[UserLog].filter("time >= '2016-10-01' and time <= '2016-10-07' and typed = '1'") .map(log=>ConsumedOnce(log.logID,log.userID,-log.consumed))) userLogDs.join(userInfo.as[UserInfo],userLogDs("userID")===userInfo("userID")) .groupBy(col("userID"),col("name")) .agg(round(sum(col("consumed"))).alias("viewConsumedIncreased")) .sort(col("viewConsuusermedIncreased")) .limit(5).show()
5. 统计注册之后前两周内访问最多的前10个人
filter多条件之间,连接查询,使用&&进行连接,注意列要使用userlog(“列名”)来选取
userLog.join(userInfo,userInfo("userID")===userLog("userID")) .filter(userInfo("registeredTime")>="2016-10-01" && userInfo("registeredTime")<="2016-10-14" && userLog("time")>=userInfo("registeredTime") && userLog("time") <= date_add(userInfo("registeredTime"), 14) && userLog("type")===0) .groupBy(userInfo("userID"),userInfo("name")) .agg(count(userLog("time")).alias("logTimes")) .sort(col("logTimes").desc) .limit(5) .show()
6. 统计注册之后前两周内购买总额最多的前10个人
userLog.join(userInfo, userInfo("userID") === userLog("userID")) .filter(userInfo("registeredTime") >= "2016-10-01" && userInfo("registeredTime") <= "2016-10-14" && userLog("time") >= userInfo("registeredTime") && userLog("time") <= date_add(userInfo("registeredTime"), 14) && userLog("typed") === 1) .groupBy(userInfo("userID"),userInfo("name")) .agg(round(sum(userLog("consumed")),2).alias("totalConsumed")) .sort(col("totalConsumed").desc) .limit(10) .show()
结论
运行结果忘记截图了,如果有需要后面可以补上,使用了2.2的api,dataset操作数据,主要熟悉一下filter函数的使用方法,并结合groupby、sort和agg等函数,进行用户行为的分析。