文章目录
核心术语
Application:应用程序,由一个driver和一些executors组成在集群里面
Application jar:应用程序jar包,可以提交运行
Driver program:跑main方法并且创建sparkcontext的进程
Cluster manager:申请集群资源的外部服务(比如yarn,standalone)
Deploy mode(spark-submit有这个参数):区分driver进程运行在哪里,如果是“cluster”模式,driver运行在集群里,如果是“client”,driver被submit启动在集群之外
Worker node:集群里能跑程序的任何节点,对于yarn相当于Nodemanager
Executor:启动应用程序在Worker node的进程,相当于Container。它可以跑task(map/filter)和保存数据在内存或者磁盘。没个application有它自己的executors
Task:线程,相当于一个工作单元,会被发给Executor。我们的程序写在main里,这些代码在Driver端,需要把这些代码发到Executor执行。
Job:由一些task组成的并行计算,当遇到action时被触发,可以在日志里看到。
Stage:一个Job被切分成更小的许多task被称为stages,后面会详述
spark运行机理
spark程序是跑在集群上的一些独立的Cluster进程。他们之间通过SparkContext协调
SparkContext可以连接到几种类型的Cluster(Spark Standalone,MEOS或SEAR),它们在Application之间分配资源。一旦连接,Spark将在集群中的节点上获取Executors,这些节点是运行计算并为应用程序存储数据的进程。接下来,Driver将application(由传递给SparkContext的JAR或Python文件定义)发送给Executors。最后,SparkContext将task发送到Execotors以运行。
1.每个application有自己一堆独立Executor进程,以多线程方式运行task。多个application互不干扰,这也就意味着数据不能在多个application中被共享,除非把Executor的数据写在外部存储器,比如Alluxio。
2.Spark对底层集群管理器不感知。只要能获得Executor进程,并且可以互相通信,那么跑在集群管理器上就是一件简单的事了。
Transformations
map和mapPartitions
举个例子说明两者区别:假设有个rdd = 100 partition
1 partiton = 10000个元素,那么总的元素个数: 100w,如果保存到数据库,map要建立100W次连接,而mapPartition一个partition处理一次,只要100次连接。虽然mapPartition带来便利,但是map每处理一条数据,便释放一次资源,mapPartition一次处理很多数据,内存可能会爆。
foreach和foreachpartition
都是action,机制和map与mapPartitions差不多。
coalesce和repartition
前者是返回一个新RDD缩减到几个分区
后者是底层带shuffle掉用coalesce的函数,能大能小
创建一个rdd,测试之
rdd.partitions.size
data.coalesce(2).partitions.size
data.repartition(4).partitions.size
coalesce使用场景有,假设100个partition作用了许多filter函数后,每个partition数据都少了,没必要那么多partition
Shuffle
相同的key被发到一个partition里去。
这样的分发就要网络传输,数据序列化,效率降低
窄依赖和宽依赖
窄依赖:一个父RDD的partition会被子RDD的partition使用一次
宽依赖:一个父RDD的partition会被子RDD的partition使用多次
从容错角度来看,窄依赖子partition挂了,容易找到父partition重新生成。
但是宽依赖子partition挂了,要找多个父partition重新生成并且要shuffle。
这里join是不是宽依赖要分情况讨论咯。
Stage
遇到一个shuffle,宽依赖就产生新的stage
shuffle数据之所以变大了,是因为map数据了呀,比如x变成了(x,1)
groupbykey和reducebykey
reducebykey性能更好,因为它会做预聚合。
hadoop的combiner也是类似的,但是在某些场景下不使用,比如除法,相加再除和除了再加结果不一样。
代码
随机的object
object DBUtils {
def getConnection() = {
new Random().nextInt(10) + ""
}
def returnConnection(connection:String) = {
}
}
map和mapPartition
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object MapPartitionApp {
def main(args: Array[String]): Unit = {
val sparkconf = new SparkConf().setMaster("local[2]").setAppName("asd")
//一个context运行在一个jvm。
val sc = new SparkContext(sparkconf)
val student = new ListBuffer[String]()
for (i<- 1 to 100){
student +="ruoze" +i
}
val stus=sc.parallelize(student)
// stus.map(x => {
// val connection = DBUtils.getConnection()
// println(connection + "~~~~~~~~~~~~")
// // TODO... 写出数据到数据库
//
// DBUtils.returnConnection(connection)
// }).foreach(println)
stus.mapPartitions(partition => {
val connection = DBUtils.getConnection()
println(connection + "~~~~~~~~~~~~")
// TODO... 写出数据到数据库
DBUtils.returnConnection(connection)
partition
}).foreach(println)
sc.stop()
}
}
foreach和foreachpartition
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object foreachpartition {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("ForeachPartitionApp")
val sc = new SparkContext(sparkConf)
val students = new ListBuffer[String]()
for(i <- 1 to 100) {
students += "若泽数据实战班五期:" + i
}
val stus = sc.parallelize(students, 4)
// stus.foreach(x => {
// val connection = DBUtils.getConnection()
// println(connection + "~~~~~~~~~~~~")
// // TODO... 写出数据到数据库
//
// DBUtils.returnConnection(connection)
// })
stus.foreachPartition(partition => {
val connection = DBUtils.getConnection()
println(connection + "~~~~~~~~~~~~")
// TODO... 写出数据到数据库
DBUtils.returnConnection(connection)
})
sc.stop()
}
}
coalesce_repartition
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object coalesce_repartition {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("ForeachPartitionApp")
val sc = new SparkContext(sparkConf)
val students = new ListBuffer[String]()
for(i <- 1 to 100) {
students += "若泽数据实战班五期:" + i
}
val stus = sc.parallelize(students, 3)
//mapPartitionsWithIndex返回索引和partition
stus.mapPartitionsWithIndex((index, partition) => {
val emps = new ListBuffer[String]
//分区是迭代的,判断还有没有元素
while(partition.hasNext) {
emps += ("~~~" + partition.next() + " , 原部门:[" + (index+1) + "]")
}
emps.iterator
}).foreach(println)
// println("============华丽的分割线================")
// stus.coalesce(2).mapPartitionsWithIndex((index, partition) => {
// val emps = new ListBuffer[String]
// while(partition.hasNext) {
// emps += ("~~~" + partition.next() + " , 新部门:[" + (index+1) + "]")
// }
// emps.iterator
// }).foreach(println)
// println("============华丽的分割线================")
//
// stus.repartition(5).mapPartitionsWithIndex((index, partition) => {
// val emps = new ListBuffer[String]
// while(partition.hasNext) {
// emps += ("~~~" + partition.next() + " , 新部门:[" + (index+1) + "]")
// }
// emps.iterator
// }).foreach(println)
sc.stop()
}
}
groupbykey,reducebykey
import org.apache.spark.{SparkConf, SparkContext}
object wc {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster("local[2]").setAppName("WcApp")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("")
val words = lines.flatMap(x=>x.split(","))
val pairs = words.map((_,1))
//[string,int]的形式
//pairs.reduceByKey(_+_)
//[string,Iterable(int)]的形式
pairs.groupByKey().map(x => (x._1, x._2.sum))
sc.stop()
}
}