引语:
本篇博客主要介绍了Spark SQL中的filter过滤数据、去重、集合等基本操作,以及一些常用日期函数,随机函数,字符串操作等函数的使用,并列编写了示例代码,同时还给出了代码当中用到的一些数据,放在最文章最后。
SparkSQL简介
Spark SQL是Spark生态系统中非常重要的组件,其前身为Shark。Shark是Spark上的数据仓库,最初设计成与Hive兼容,但是该项目于2014年开始停止开发,转向Spark SQL。Spark SQL全面继承了Shark,并进行了优化。 Spark SQL增加了SchemaRDD(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。
Spark SQL的优点
Spark SQL可以很好地支持SQL查询,一方面,可以编写Spark应用程序使用SQL语句进行数据查询,另一方面,也可以使用标准的数据库连接器(比如JDBC或ODBC)连接Spark进行SQL查询 。
Spark SQL基本操作
去重
distinct:根据每条数据进行完整去重。
dropDuplicates:根据字段去重。
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 类名 DistinctDemo
* 作者 彭三青
* 创建时间 2018-11-29 15:02
* 版本 1.0
* 描述: $ 去重操作:distinct、drop
*/
object DistinctDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("Operations")
.getOrCreate()
import spark.implicits._
val employeeDF: DataFrame = spark.read.json("E://temp/person.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
println("--------------------distinct---------------------")
// 根据每条数据进行完整的去重
employeeDS.distinct().show()
println("--------------------dropDuplicates---------------------")
// 根据字段进行去重
employeeDS.dropDuplicates(Seq("name")).show()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
过滤
filter():括号里的参数可以是过滤函数、函数返回的Boolean值(为true则保留,false则过滤掉)、列名或者表达式。
except:过滤出当前DataSet中有,但在另一个DataSet中不存在的。
intersect:获取两个DataSet的交集。
提示:except和intersect使用的时候必须要是相同的实例,如果把另外一个的Employee换成一个同样的字段的Person类就会报错。
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 类名 FilterDemo
* 作者 彭三青
* 创建时间 2018-11-29 15:09
* 版本 1.0
* 描述: $
*/
object FilterDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("FilterDemo")
.getOrCreate()
import spark.implicits._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json")
val employee2DS: Dataset[Employee] = employee2DF.as[Employee]
println("--------------------employee--------------------")
employeeDS.show()
println("--------------------employee2--------------------")
employee2DS.show()
println(
" ┏┓ ┏┓\n" +
" ┏┛┻━━━┛┻┓\n" +
" ┃ ┃\n" +
" ┃ ━ ┃\n" +
" ┃ ┳┛ ┗┳ ┃\n" +
" ┃ ┃\n" +
" ┃ ┻ ┃\n" +
" ┃ ┃\n" +
" ┗━┓ ┏━┛\n" +
" ┃ ┃\n" +
" ┃ ┃\n" +
" ┃ ┗━━━┓\n" +
" ┃ ┣┓\n" +
" ┃ ┏┛\n" +
" ┗┓┓┏━┳┓┏┛\n" +
" ┃┫┫ ┃┫┫\n" +
" ┗┻┛ ┗┻┛\n"
)
println("-------------------------------------------------")
// 如果参数返回true,就保留该元素,否则就过滤掉
employeeDS.filter(employee => employee.age == 35).show()
employeeDS.filter(employee => employee.age > 30).show()
// 获取当前的DataSet中有,但是在另外一个DataSet中没有的元素
employeeDS.except(employee2DS).show()
// 获取两个DataSet的交集
employeeDS.intersect(employee2DS).show()
spark.stop()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
集合
collect_set:将一个分组内指定字段的值都收集到一起,不去重
collect_list:讲一个分组内指定字段的值都收集到一起,会去重
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 类名 CollectSetAndList
* 作者 彭三青
* 创建时间 2018-11-29 15:24
* 版本 1.0
* 描述: $ collect_list、 collect_set
*/
object CollectSetAndList {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("FilterDemo")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
// collect_list:将一个分组内指定字段的值都收集到一起,不去重
// collect_set:同上,但唯一区别是会去重
employeeDS
.groupBy(employeeDS("depId"))
.agg(collect_set(employeeDS("name")), collect_list(employeeDS("name")))
.show()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
join和sort
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 类名 JoinAndSort
* 作者 彭三青
* 创建时间 2018-11-29 15:19
* 版本 1.0
* 描述: $
*/
object JoinAndSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("FilterDemo")
.getOrCreate()
import spark.implicits._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
val departmentDF: DataFrame = spark.read.json("E://temp/department.json")
val departmentDS: Dataset[Department] = departmentDF.as[Department]
println("----------------------employeeDS----------------------")
employeeDS.show()
println("----------------------departmentDS----------------------")
departmentDS.show()
println("------------------------------------------------------------")
// 等值连接
employeeDS.joinWith(departmentDS, $"depId" === $"id").show()
// 按照年龄进行排序,并降序排列
employeeDS.sort($"age".desc).show()
}
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
函数的使用
日期函数
current_time():获取当前日期。
current_timestamp():获取当前时间戳。
数学函数
rand():生成0~1之间的随机数
round(e: column,scale: Int ):column列名,scala精确到小数点的位数。
round(e: column):一个参数默认精确到小数点1位。
字符串函数
concat_ws(seq: String, exprs: column*):字符串拼接。参数seq传入的拼接的字符,column传入的需要拼接的字符,可以指定多个列,不同列之间用逗号隔开。
package spark2x
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 类名 FunctionsDemo
* 作者 彭三青
* 创建时间 2018-11-29 15:56
* 版本 1.0
* 描述: $
*/
object FunctionsDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("Operations")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
employeeDS
.select(employeeDS("name"), current_date(), current_timestamp(),
rand(), round(employeeDS("salary"), 2),// 取随机数,
concat(employeeDS("gender"), employeeDS("age")),
concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()
spark.stop()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
数据
employee.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}
employee2.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
department.json
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}