import org.apache.spark.SparkConf
import org.apache.spark.sql.{
Dataset, SparkSession}
/**
* @author liu a fu
* @date 2021/1/17 0017
* @version 1.0
* @DESC SQL 和 DSL 两种方式实现wordcount案例
* 1-准备SparkSession的环境
* 2-读取文件
* 3-使用DSL的风格完成单词统计计数
* 4-使用SQL的风格完成单词统计计数
* 5-打印输出结果
* 6-关闭SparkSession
*/
object _08SparkSQLAndDSLWordCount {
def main(args: Array[String]): Unit = {
//1-环境准备
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[8]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
//读取源文件数据
val fileDS: Dataset[String] = spark.read.textFile("data/input/words.txt")
//val df: DataFrame = spark.read.text()//string,double,Person 没有办法在不确定类型的情况下切分,df.flatMap(_.split)
val wordDS: Dataset[String] = fileDS.flatMap(_.split("\\s+"))
/**
* 3-使用DSL的风格完成单词统计计数
*/
wordDS.printSchema()
/**
* root
* |-- value: string (nullable = true)
*/
wordDS.show()
/**
* +------+
* | value|
* +------+
* | hello|
* |hurong|
* |liuafu|
* | hello|
* | kobe|
* | hello|
* | java|
* | hello|
* | james|
* +------+
*/
wordDS.groupBy("value").count().orderBy('count.desc).show()
wordDS.groupBy("value").count().orderBy($"count".desc).show()
/**
* +------+-----+
* | value|count|
* +------+-----+
* | hello| 4|
* | kobe| 1|
* |liuafu| 1|
* |hurong| 1|
* | james| 1|
* | java| 1|
* +------+-----+
*/
/**
* 4-使用SQL的风格完成单词统计计数 createOrReplaceTempView
*/
wordDS.createOrReplaceTempView("table_count") //指定表名
//写SQL语句
val sql: String =
"""
|select value,count(value) as count_value
|from table_count
|group by value
|order by count_value desc
|""".stripMargin
spark.sql(sql).show()
/**
* +------+-----------+
* | value|count_value|
* +------+-----------+
* | hello| 4|
* |liuafu| 1|
* | java| 1|
* | james| 1|
* | kobe| 1|
* |hurong| 1|
* +------+-----------+
*/
//SQL的方式2 createOrReplaceGlobalTempView (了解)
wordDS.createOrReplaceGlobalTempView("global_table_view")
spark.sql("select * from global_temp.global_table_view").show()
//全局session可访问
spark.newSession().sql("select * from global_temp.global_table_view").show()
spark.newSession().sql("select value,count(value) as count_value from global_temp.global_table_view group by value order by count_value desc")
/**
* +------+
* | value|
* +------+
* | hello|
* |hurong|
* |liuafu|
* | hello|
* | kobe|
* | hello|
* | java|
* | hello|
* | james|
* +------+
*/
//关闭SparkSession
spark.stop()
}
}
SparkSQL的两种方式实现WordCount案例代码
猜你喜欢
转载自blog.csdn.net/m0_49834705/article/details/112801436
今日推荐
周排行