简介: DataFrame提供了两种风格的语法操作,分别是DSL风格语法和SQL语法.
- DataFrame提供了一个领域特定语言(DSL)来操作结构化数据。
- SQL风格语法: 可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
Scala代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{
DataFrame, SparkSession}
/**
* @author liu a fu
* @date 2021/1/16 0016 19:23
* @version 1.0
*/
/***
* SparkSQL两种风格的花式查询 DSL风格 和 SQL风格
*/
case class Person(id:Int,name:String,age:Int)
object _02SparkSQLDSLAndSQL {
//1-首先初始化环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN") //设置日志级别
//如果从rdd转化到df需要引入隐式转换 DataFrame
import spark.implicits._
import org.apache.spark.sql.functions._
//2-使用SparkSession读取数据
val peopleRDD: RDD[String] = spark.sparkContext.textFile("data/input/sql/people1.txt")
val valueRDD: RDD[Person] = peopleRDD.map(x => x.split("\\s+")).map(line => Person(line(0).toInt, line(1), line(2).toInt))
val peopleDF: DataFrame = valueRDD.toDF()
//3-视同SQL的风格查询
println("====================DSL风格===============================")
peopleDF.printSchema() //查询Schema信息
peopleDF.show() //展示信息
// ---+--------+---+
// id| name|age|
// ---+--------+---+
// 1|zhangsan| 20|
// | 2| lisi| 29|
// | 3| wangwu| 25|
// | 4| zhaoliu| 30|
// | 5| tianqi| 35|
// | 6| kobe| 40|
// | 7| kobe| 41|
// +---+--------+---+
peopleDF.show(1) //展示第一条信息
peopleDF.show(2,false) //展示信息 truncate是否阶段当前输出 20个字符
//使用SparkSQL选择具体的一个列
peopleDF.select("age").show()
peopleDF.select(col("age")).show()
peopleDF.select($"age").show() //底层实现是col(age)
peopleDF.select('age).show() //底层实现是col(age)
//使用SparkSQL选择具体的两个列
peopleDF.select("name","age").show()
peopleDF.select(col("name"),col("age")).show()
//使用SparkSQL对列进行操作
peopleDF.select(col("age") + 1).show()
peopleDF.select('age +1).as("ageAdd").show()
peopleDF.select('age + 1).alias("ageAdd1").show()
peopleDF.select($"name",'age + 2 as("ageadd")).show()
//过滤
peopleDF.filter($"age" > 20).show()
peopleDF.filter("age > 30" ).show()
peopleDF.filter('age > 30).show()
//group by
peopleDF.groupBy("name").count().show()
peopleDF.groupBy('name).agg(
Map("age" -> "max")
).show()
println("====================SQL风格===============================")
//这里使用 SQL的查询方式哦
print("="*100)
peopleDF.createOrReplaceTempView("people_view") //给表取名字
spark.sql("desc people_view").show() //字段描述信息
spark.sql("select * from people_view").show() //查询全部
spark.sql("select * from people_view where age > 30").show()
val sql:String =
"""
|select name,count(name) as count_name
|from people_view
|group by name
|order by count_name desc
|""".stripMargin
spark.sql(sql).show()
//结果:
// +--------+----------+
// | name|count_name|
// +--------+----------+
// | kobe| 2|
// | wangwu| 1|
// | lisi| 1|
// |zhangsan| 1|
// | zhaoliu| 1|
// | tianqi| 1|
// +--------+----------+
peopleDF.printSchema()
//结果
// root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//DSL
peopleDF.select('id,'name,'age.cast("string")).show()
//结果:
// +---+--------+---+
// | id| name|age|
// +---+--------+---+
// | 1|zhangsan| 20|
// | 2| lisi| 29|
// | 3| wangwu| 25|
// | 4| zhaoliu| 30|
// | 5| tianqi| 35|
// | 6| kobe| 40|
// | 7| kobe| 41|
// +---+--------+---+
//5-关闭SparkSession
spark.stop()
}
}