1、Action操作
employee数据表
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}
执行代码
import org.apache.spark.sql.SparkSession
/**
* action操作详解
*
* collect、count、first、foreach、reduce、show、take
*
*/
object ActionOperation {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("ActionOperation")
.master("local")
.config("spark.sql.warehouse.dir", "C:\\Users\\Administrator\\Desktop\\spark-warehouse")
.getOrCreate()
import spark.implicits._
val employee = spark.read.json("C:\\Users\\Administrator\\Desktop\\employee.json")
// collect:将分布式存储在集群上的分布式数据集(比如dataset),中的所有数据都获取到driver端来
employee.collect().foreach { println(_) }
// count:对dataset中的记录数进行统计个数的操作
println(employee.count())
// first:获取数据集中的第一条数据
println(employee.first())
// foreach:遍历数据集中的每一条数据,对数据进行操作,这个跟collect不同,collect是将数据获取到driver端进行操作
// foreach是将计算操作推到集群上去分布式执行
// foreach(println(_))这种,真正在集群中执行的时候,是没用的,因为输出的结果是在分布式的集群中的,我们是看不到的
employee.foreach { println(_) }
// reduce:对数据集中的所有数据进行归约的操作,多条变成一条
// 用reduce来实现数据集的个数的统计
println(employee.map(employee => 1).reduce(_ + _))
// show,默认将dataset数据打印前20条
employee.show()
// take,从数据集中获取指定条数
employee.take(3).foreach { println(_) }
}
}
2、基础操作
持久化
创建临时视图 主要是为了可以直接对数据执行sql语句
获取执行计划 获取spark sql的执行计划
查看schema
写数据到外部存储
dataset与dataframe相互转换 as toDF
package com.scala.spark
import org.apache.spark.sql.SparkSession
object BasicOperation {
case class Employee(name:String,age:Long,depId:Long,gender:String,salary:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("action").master("local").getOrCreate()
import spark.implicits._
val employee=spark.read.json("employee")
//第一步 cache()避免重复计算
/* employee.cache()
println(employee.count())
println(employee.count())*/
//创建临时视图,主要是为了,可以直接对数据执行sql语句
employee.createOrReplaceTempView("temp")
spark.sql("select * from temp where age>25").show()
//获取sql执行计划
//dataframe/dataset,比如执行了一个sql语句获取的dataframe,实际上内部包含一个logical plan,逻辑执行计划
//设计执行的时候,首先会通过底层的catalyst optimizer,生成物理执行计划,比如说会做一些优化,比如push filter
//还会通过whole-stage code generation技术去自动化生成代码,提升执行性能
spark.sql("select * from temp where age>25").explain()
employee.printSchema()
val employDataSet = employee.as[Employee]
employDataSet.show()
employDataSet.printSchema()
val frame = employDataSet.toDF()
frame.show()
frame.printSchema()
}
}
3、typed操作 类似rdd 有稍微区别
repartition 操作 coalesce操作
coalesce和repartition操作 都是重定义分区 区别coalesce只能减少分区数量 而且可以选择不发生shuffle
repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
package com.scala.spark.typedOperation
import org.apache.spark.sql.SparkSession
object TypedOperation {
case class Employee(name:String,age:Long,depId:Long,gender:String,salary:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("action").master("local").getOrCreate()
import spark.implicits._
val employee=spark.read.json("employee")
val DataSet = employee.as[Employee]
println(DataSet.rdd.partitions.size)
//coalesce和repartition操作 都是重定义分区 区别coalesce只能减少分区数量 而且可以选择不发生shuffle
// repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
val repartitionDataSet = DataSet.repartition(10)
//看下分区
println(repartitionDataSet.rdd.partitions.size)
val coalescePartion = repartitionDataSet.coalesce(5)
println(coalescePartion.rdd.partitions.size)
DataSet.show()
}
}
distinct 和dropDuplicates操作
distinct去重,是根据每条数据,进行完整内容比对之后有重复的去掉
dropDuplicates 根据每一条数据,可以按照指定的字段进行去重 多个条件也可以