实例代码:
object movies2{
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("movies")
.master("local[*]")
.getOrCreate()
val lines: Dataset[String] = spark.read.textFile("file:///C:\\Users\\lenovo\\Desktop\\app-2019-12-12.log")
import spark.implicits._
// 21022734,妈妈的朋友,2019-05-15 08:58:05,5.2,搜秀影院
val movies: Dataset[(Int, String, Int, Int, Double, String)] = lines.map(x => {
val s: Array[String] = x.split(",")
val userId: Int = s(0).toInt
val movieName: String = s(1)
//2019052011
val day: Int = DateUtils.parseToMin(s(2)).substring(0,6).toInt
val hours: Int = DateUtils.parseToMin(s(2)).substring(8, 10).toInt
val score: Double = s(3).toDouble
val cinema: String = s(4)
(userId, movieName, day, hours, score, cinema)
})
val df: DataFrame = movies.toDF("userId","movieName","day","hours","score","cinema")
df.createTempView("movies")
// val r: DataFrame = spark.sql("select * from movies")
// r.show(30)
val rs1: DataFrame = spark.sql("select hours,count(*) as count from movies where day = 201912 group by hours order by hours")
println("某月(12月)内电影观看时间点分布:")
rs1.show(24)
val rs2: DataFrame = spark.sql("select cinema,count(*) as Box_office from movies where day = 201912 group by cinema order by Box_office desc")
println("某月(12月)电影院票房统计")
rs2.show()
//将分析后的数据写到一个文件中,调用coalesce方法传入分区数
rs2.coalesce(1).write.format("csv").save("/fengrui")
//或者下面这个
rs2.coalesce(1).write.csv("/fengrui2")
rs2.coalesce(1).write.json("/fengrui2")
}
}
我的结果放到hdfs上了,只生成一个文件,不调用coalesce的话会按照并行度来切分文件,会落地好多文件,看起来不方便
,json或者CSV格式数据不会有乱码出现