spark知识点总结

**

1、Spark简介

**
 1)、Spark的历史:2012年发布初始版本0.6版本,已经有6年的历史了。
 2)、Spark的创始人:美国加州大学的伯克利分校的AMP实验室。
 3)、Spark比MR快的原因:
   ①Spark是粗粒度的资源调度,资源复用。
   ②Spark支持基于内存迭代,MR不支持。
   ③Spark支持DAG有向无环图 task pipleline。
   ④Spark可以根据不同场景选择不同shuffle,spark shuffle 比MR性能高(sortShuffle)
5)、spark的运行模式:local、standalone、yarn、mesos。
6)、开发Spark的语言:scala、java、python、R。(Scala和Java兼容性和效率都是一样的)

2、RDD(弹性分布式数据集)(重点)

1)、RDD五大特性:(重点)

     1. RDD是由一系列的Paratition组成的。(partition个数=split切片数 约等于 block数;Spark没有读文件的方法,依赖MR读文件的方法)
     2. RDD提供的每一个算子实际上是作用在每一个Paratition上的。
     3. RDD实际上是有一系列的依赖关系的,依赖于其他的RDD。(计算的容错性;体现了RDD的弹性;父RDD不一定知道子RDD是谁,子RDD一定知道父RDD是谁)
     4. 可选:分区器作用在内部计算逻辑的返回值是kv格式的RDD上。
     5. 可选:RDD会提供一系列的最佳计算位置。(计算找数据)

2)、算子

     1. taransformation类算子
        	map(一对一)、flatMap(一对多)、filter(一对N(0、1))、join、leftouterJoin、rightouterJoin、fullouterJoin、sortBy、sortByKey、gorupBy、groupByKey、reduceBy、reduceByKey、sample、union、mappatition、mappatitionwithindex、zip、zipWithIndex。
     2. action类算子
        count、collect(将task的计算结果拉回到Driver端)、foreach(不会回收所有task计算结果,原理:将用户传入的参数推送到各个节点上去执行,只能去计算节点找结果)、saveAsTextFile(path)、reduce、foreachPatition、take、first。

(查看计算结果的方式:WEBUI、去各个节点的Worker工作目录查看)
3. 控制类算子
cache(相当于MEMOORY_ONLY)、
persist(MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK)
控制类算子注意点:
1)、控制类算子后不能紧跟action类算子
2)、缓存单元是partition
3)、懒执行、需要action类算子触发执行。(如果application中只有一个job,没必要使用控制类算子)

3、Spark在集群中大概运行流程

 1. Driver分发task到节点运行(计算找数据)。
 2. task执行结果拉回到Driver(有可能发生OOM)。
 Driver的作用:
     1)、分发任务到计算节点运行。
     2)、监控task(thread)的运行情况。
     3)、如果task失败,会重新发送(有限制)。
     4)、可以拉回结果到Driver进程。
 结论:Driver进程会和集群频繁通信。

4、提交Application的方式

1、Client
    提交方式:spark-submit --deploy-mode client --class jarPath args
    特点:Driver进程在客户端节点启动
    适用场景:测试环境
    大概运行流程:
        1)、在Client本地启动Driver进程。
        2)、Driver会向Master为当前Application申请资源。
        3)、Master接收到请求后,会在资源充足的节点上启动Executor进程。
        4)、Driver分发task到Executor执行。
2、Cluster
    提交方式:spark-submit --deploy-mode cluster --class jarPath args
    特点:每次启动application,Driver进程在随机一台节点启动
    适用场景:生产环境
    大概运行流程:
        1)、客户端执行spark-submit --deploy-mode cluster --class jarPath args命令,启动一个sparksubmit进程。
        2)、为Driver向Master申请资源。Driver进程默认需要1G内存,1core。
        3)、master会随机找一台Worker节点启动Driver进程。
        4)、Driver进程启动成功后,spark-submit进程关闭,然后Driver会向Master为当前Application申请资源。
        5)、Master接收到请求后,会在资源充足的节点上启动Executor进程。
        6)、Driver分发task到Executor执行。

1、学习任务调度前需要了解的知识点

1.1、Spark中的一些专业术语
  1.1.1、任务相关
    Application:用户写的应用程序(DriverProgram + ExecutorProgram)。
    Job:一个action类算子触发的操作。
    stage:一组任务,例如:map task。
    task:(thread)在集群运行时,最小的执行单元。

1.1.2、资源相关
    Mstaer:资源管理主节点。
    Worker:资源管理从节点。
    Executor:执行任务的进程。
    ThreadPool:线程池(存在于Executor进程中)

 1.2、RDD中的依赖关系
      1.2.1、宽依赖
              父RDD与子RDD,partition之间的关系是一对多,一般来说,宽依赖都会导致shuffle。(默认情况下,groupByKey返回的RDD的分区数与父RDD是一致的。如果你在使用groupByKey的时候,传入一个Int类型的值,那么分区数就是这个值。)
              

      1.2.2、窄依赖
              父RDD与子RDD,partition之间的依赖关系是一对一,这种依赖关系不会有shuffle。
              

      1.2.3、宽窄依赖的作用
              宽窄依赖的作用就是:把job切割成一个个的stage。
              切割stage的过程:(stage与 stage之间是宽依赖,stage内部是窄依赖)
              
              那么接下来问题来了,为什么我们需要把job切割成stage呢?
              答:把job切割成stage之后,stage内部就可以很容易的划分出一个个的task任务(用一条线把task内部有关联的子RDD与父RDD串联起来),然后就可把task放到管道中运行了。

              下一个问题:RDD存储的到底是什么样的计算逻辑呢?下面用一个例子来解释:

              在这个Application中有一个job,一个stage,2个task。
              task0:这条线贯穿所有的partition中的计算逻辑,并且以递归函数展开式的形式整合到一起,fun2(fun1(textFile(b1))),最好将这个计算逻辑发送到b1或者其副本所在节点。task1也是相同的逻辑。同时注意:task的计算模式是pipeline的计算模式。

 1.3、学习任务调度前需要了解的问题

       1.3.1、stage中的每一个task(管道计算模式)会在什么时候落地磁盘?
              1)、如果stage后面是跟的是action类算子
                  saveAsText:将每一个管道计算结果写入到指定目录。
                  collect:将每一个管道计算结果拉回到Driver端内存中。
                  count:将每一个管道计算结果,统计记录数,返回给Driver。
              2)、如果stage后面是跟的是stage
                  在shuffle write阶段会写磁盘。(为什么在shuffle write阶段写入磁盘?防止reduce task拉取文件失败,拉取失败后可以直接在磁盘再次拉取shuffle后的数据)

       1.3.2、Spark在计算的过程中,是不是特别消耗内存?
              不是。Spark是在管道中计算的,而管道中不是特别耗内存。即使有很多管道同时进行,也不是特别耗内存。

       1.3.3、什么样的场景最耗内存?
              使用控制类算子的时候耗内存,特别是使用cache时最耗内存。

       1.3.4、如果管道中有cache逻辑,他是如何缓存数据的?
              有cache时,会在一个task运行成功时(遇到action类算子时),将这个task的运行结果缓存到内存中。

       1.3.5、RDD(弹性分布式数据集),为什么他不存储数据还叫数据集?
              虽然RDD不具备存储数据的能力,但是他具备操作数据的能力。

2、任务调度
2.1、任务调度的流程

          1)、DAGScheduler:根据RDD的宽窄依赖关系将DAG有向无环图切割成一个个的stage,将stage封装给另一个对象taskSet,taskSet=stage,然后将一个个的taskSet给taskScheduler。

          2)、taskScheduler:taskSeheduler拿倒taskSet之后,会遍历这个taskSet,拿到每一个task,然后去调用HDFS上的方法,获取数据的位置,根据获得的数据位置分发task到响应的Worker节点的Executor进程中的线程池中执行。

          3)、taskSchedule:taskSchedule节点会跟踪每一个task的执行情况,若执行失败,TaskSche会尝试重新提交,默认会重试提交三次,如果重试三次依然失败,那么这个task所在的stage失败,此时TaskSchedule向DAGSchedule做汇报。

          4)、DAGScheduler:接收到stage失败的请求后,,此时DAGSheduler会重新提交这个失败的stage,已经成功的stage不会重复提交,只会重试这个失败的stage。
          (注:如果DAGScheduler重试了四次依然失败,那么这个job就失败了,job不会重试)

 2.2、配置信息使用的三种方式
          1)、在代码中使用SparkConf来配置。

          2)、在提交的时候使用 --conf来配置。
               spark-submit --master --conf k=v 如果要设置多个配置信息的值,需要使用多个–conf

          3)、在spark的配置文件spark-default.conf中配置。

 2.3、什么是挣扎(掉队)的任务?

          当所有的task中,75%以上的task都运行成功了,就会每隔一百秒计算一次,计算出目前所有未成功任务执行时间的中位数*1.5,凡是比这个时间长的task都是挣扎的task。


 2.4、关于任务调度的几个小问题

       2.4.1、如果有1T数据,单机运行需要30分钟,但是使用Saprk计算需要两个小时(4node),为什么?
          1)、发生了计算倾斜。大量数据给少量的task计算。少量数据却分配了大量的task。
          2)、开启了推测执行机制。

       2.4.2、对于ETL(数据清洗流程)类型的数据,开启推测执行、重试机制,对于最终的执行结果会不会有影响?
         有影响,最终数据库中会有重复数据。
         解决方案:
             1)、关闭各种推测、重试机制。
             2)、设置一张事务表。

1、绪论
  
  我们运行一个Spark应用程序时,首先第一步肯定是写一个Spark Application应用程序,然后调用资源调度器为Driver申请资源。申请成功后,向master为Application申请资源,申请完毕后,调用资源调度器把任务分发到节点执行。在各个节点进行分布式的并行计算。

2、前置知识
  对于Application来说,资源是Executor。对于Executor来说资源是内存、core。

Master里面有几个对象:workers、waitingDrivers、waitingApps。下面对这几个对象做一个简单介绍,这几个对象是在源码中声明的,如需更详细的认知,可自行查看看源码(源码是使用的是Scala,如需快速了解,请参考《Scala快速学习》)。

val works = new HashSetWorkInfo
val waitingDrivers = new ArrayBufferDriverInfo
val waitingApps = new ArrayBufferApplicationInfo

在上面的代码中,WorkInfo代表的是work节点的节点信息。DriverInfo是Driver发送过来的请求信息。ApplicationInfo是发送过来的Application的信息。

val works = new HashSetWorkInfo
  works 集合采用HashSet数组存储work的节点信息,可以避免存放重复的work节点。为什么要避免重复?首先我们要知道work节点有可能因为某些原因挂掉,挂掉之后下一次与master通信时会报告给master,这个节点挂掉了,然后master会在works对象里把这个节点去掉,等下次再用到这个节点是时候,再加进来。这样来说,理论上是不会有重复的work节点的。可是有一种特殊情况:work挂掉了,在下一次通信前又自己启动了,这时works里面就会有重复的work信息。

val waitingDrivers = new ArrayBufferDriverInfo
  当客户端向master为Driver申请资源时,会将要申请的Driver的相关信息封装到master节点的DriverInfo这个泛型里,然后添加到waitingDrivers 里。master会监控这个waitingDrivers 对象,当waitingDrivers集合中的元素不为空时,说明有客户端向master申请资源了。此时应该先查看一下works集合,找到符合要求的worker节点,启动Driver。当Driver启动成功后,会把这个申请信息从waitingDrivers 对象中移除。

val waitingApps = new ArrayBufferApplicationInfo
  Driver启动成功后,会为application向master申请资源,这个申请信息封存到master节点的waitingApps 对象中。同样的,当waitingApps 集合不为空,说明有Driver向Master为当前的Application申请资源。此时查看workers集合,查找到合适的Worker节点启动Executor进程,默认的情况下每一个Worker只是为每一个Application启动一个Executor,这个Executor会使用1G内存和所有的core。启动Executor后把申请信息从waitingApps 对象中移除。

注意点:上面说到master会监控这三个集合,那么到底是怎么监控的呢???
  master并不是分出来线程专门的对这三个集合进行监控,相对而言这样是比较浪费资源的。master实际上是‘监控’这三个集合的改变,当这三个集合中的某一个集合发生变化时(新增或者删除),那么就会调用schedule()方法。schedule方法中封装了上面提到的处理逻辑。

4、详细步骤
  1、执行提交命令,会在client客户端启动一个spark-submit进程(用来为Driver申请资源)。
  2、为Driver向Master申请资源,在Master的waitingDrivers 集合中添加这个Driver要申请的信息。Master查看works集合,挑选出合适的Work节点。
  3、在选中的Work节点启动Driver进程(Driver进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。
  4、Driver进程为要运行的Application申请资源(这个资源指的是Executor进程)。此时Master的waitingApps 中要添加这个Application申请的资源信息。这时要根据申请资源的要求去计算查看需要用到哪些Worker节点(每一个节点要用多少资源)。在这些节点启动Executor进程。
  (注:轮询启动Executor。Executor占用这个节点1G内存和这个Worker所能管理的所有的core)
  5、此时Driver就可以分发任务到各个Worker节点的Executor进程中运行了。

5、资源调度结论
  1、默认情况下,每一个Worker只会为每一个Application启动一个Executor。每个Executor默认使用1G内存和这个Worker所能管理的所有的core。
  2、如果想要在一个Worker上启动多个Executor,在提交Application的时候要指定Executor使用的core数量。提交命令:spark-submit --executor-cores
  3、默认情况下,Executor的启动方式是轮询启动,一定程度上有利于数据的本地化。

什么是轮询启动???为什么要轮训启动呢???

轮询启动:轮询启动就是一个个的启动。例如这里有5个人,每个人要发一个苹果+一个香蕉。轮询启动的分发思路就是:五个人先一人分一个苹果,分发完苹果再分发香蕉。

为什么要使用轮询启动的方式呢???我们做大数据计算首先肯定想的是计算找数据。在数据存放的地方直接计算,而不是把数据搬过来再计算。我们有n台Worker节点,如果只是在数据存放的节点计算。只用了几台Worker去计算,大部分的worker都是闲置的。这种方案肯定不可行。所以我们就使用轮询方式启动Executor,先在每一台节点都允许一个任务。

存放数据的节点由于不需要网络传输数据,所以肯定速度快,执行的task数量就会比较多。这样不会浪费集群资源,也可以在存放数据的节点进行计算,在一定程度上也有利于数据的本地化。

6、Spark的粗细粒度
  6.1、粗粒度(富二代)
    在任务执行之前,会先将资源申请完毕,当所有的task执行完毕,才会释放这部分资源。
    优点:每一个task执行前。不需要自己去申请资源了,节省启动时间。
    缺点:等到所有的task执行完才会释放资源,集群的资源就无法充分利用。

6.2、细粒度(穷二代)
    Application提交的时候,每一个task自己去申请资源,task申请到资源才会执行,执行完这个task会立刻释放资源。
    优点:每一个task执行完毕之后会立刻释放资源,有利于充分利用资源。
    缺点:由于需要每一个task自己去申请资源,导致task启动时间过长,进而导致stage、job、application启动时间延长。

7、加深理解
  这里有几个小问题,益脑游戏。。。

前提条件:假设我们有5个worker,每个worker节点提供10G内存,10个core。

1、spark-submit --master … --executor-cores 2 --executor-memory 2G … 在集群中会启动多少个Executor进程???
  答:25
  分析:每个Executor进程使用2个core+2G内存。所以1台worker节点可以启动5个Executor。因为有5台worker节点。所以共可以启动5*5=25个Executor进程。

2、spark-submit --master … --executor-cores 3 --executor-memory 4G … 在集群中会启动多少个Executor进程???
  答:10
  分析:根据提交命令,1个Executor进程需要占用3个core+4G内存。1个Worker节点有10个core和10G内存。若按照core来计算,10/3=3,可以启动3个Executor。若按照内存来算,,10/4=2,可以启动2个Executor。这样来看,内存不足,肯定启动不了3个Executor。所以,1个Worker启动2个Executor,5*2=10。

3、spark-submit --master … --executor-cores 2 --executor-memory 2G --total-executor-cores 10 … 在集群中会启动多少个Executor进程???(–total-executor-cores:整个Application最多使用的core数)
  答:5
  分析:这个题比前两个多了一个限制条件,最多启动10个core。如果不考虑这个条件,这个题和第1题就一样了,应该是可以启动25个Executor,共用50个core。可是整个Application最多使用10个core,一个Executor使用2个core,所以10/2=5。只能启动5个Executor。

4、spark-submit --master … --executor-cores 2 --executor-memory 2G --total-executor-cores 4 … 集群中Executor的分布情况???(–total-executor-cores:整个Application最多使用的core数)
  答:随机找两台Worker节点。
  分析:这个题和上一题的思路完全一样。因为core数限制,不能启动25个Executor,只能启动4/2=2个Executor进程。因为spark的任务启动方式是轮询启动,所以会随机找两台Worker节点启动Executor。

5、启动Executor个数的公式:min(min(wm/em,wc/ec)*wn,tec/ec)
  注:
    --executor-cores : ec
    --executor-memory : em
    --total-executor-cores : tec
    worker_num : wn
    worker_memory : wm
    worker_core : wc
  分析:
    min(wm/em,wc/ec):根据要求的core数和内存容量,得到一台节点可以启动Executor的两个数值。这两个数取小值。
    x1 = min(wm/em,wc/ec)wn:一台worker节点的Executor数worker结点数=可以启动的总的Executor数。
    x2 = tec/ec:根据要求的总core数求出来可以启动的Executor总数。
    min(min(wm/em,wc/ec)*wn,tec/ec) == min(x1,x2):上面求出来的两个Executor数量取小值。

发布了6 篇原创文章 · 获赞 5 · 访问量 80

猜你喜欢

转载自blog.csdn.net/AnnerLi/article/details/104303562