Spark面试整理

一、spark的优势:

  (1)每一个作业独立调度,可以把所有的作业做一个图进行调度,各个作业之间相互依赖,在调度过程中一起调度,速度快。

  (2)所有过程都基于内存,所以通常也将Spark称作是基于内存的迭代式运算框架。

  (3)spark提供了更丰富的算子,让操作更方便。

二、为什么Spark比Map Reduced运算速度快:

  (1)Spark计算比MapReduce快的根本原因在于DAG计算模型。一般而言,DAG相比Hadoop的MapReduce在大多数情况下可以减少shuffle次数。

  (2)Hadoop每次计算的结果都要保存到hdfs,然后每次计算都需要从hdfs上读书数据,磁盘上的I/O开销比较大。 spark一次读取数据缓存在内存中,内存的数据读取比磁盘数据读取快很多。还有一点就是spark的RDD数据结构,RDD在每次transformation后并不立即执行,而且action后才执行,有进一步减少了I/O操作。

  (3)MR它必须等map输出的所有数据都写入本地磁盘文件以后,才能启动reduce操作,因为mr要实现默认的根据Key的排序!所以要排序肯定得写完所有数据,才能排序,然后reduce来拉取。但是spark不需要,spark默认情况下,是不会对数据进行排序的。因此shufflemaptask每写入一点数据,resulttask就可以拉取一点数据,然后再本地执行我们定义的聚合函数和算子,进行计算.

三、spark的DAG有向无循环图:

  DAG叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

四、spark如何分区:

  分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区,分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个任务中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定。spark默认分区方式是HashPartitioner.

  只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None,每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

  (1)HashPartitioner分区:partition = key.hashCode () % numPartitions,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。

               缺点:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据

  (2)RangePartitioner分区(范围分区):RangePartitioner会对key值进行排序,然后将key值按照分区个数进行划分分区.尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。分界的算法尤为重要。算法对应的函数是rangeBounds.

  (3)CustomPartitioner自定义分区:需要继承org.apache.spark.Partitioner类,sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))

五、spark从HDFS中读取数据是如何分区的:

  Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。

  注意:一般合理的分区数设置为总核数的2~3倍

六、spark数据倾斜出现的原因:

  根本原因是分区不均匀,在执行shuffle操作的时候,是按照key,来进行values的数据的输出、拉取和聚合的。同一个key的values,一定是分配到一个reduce task进行处理的。某个或者某些key对应的数据,远远的高于其他的key。定位数据倾斜就是看哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

  1、数据倾斜发生的现象:

  (1)大部分的task执行的特别快,剩下的几个task执行的特别慢.

  (2)运行一段时间后,其他task都已经执行完成,但是有的task可能会出现OOM异常因为task的所分配的数据量太大,而且task每处理一条数据还要创建大量的对象,内存存储不下.

  2、解决数据倾斜的方法:

  (1)聚合源数据:在数据的源头将数据聚合成一个key对应多个value值.这样在进行操作时就可能不会出现shuffle过程.

  (2)将导致数据倾斜的key提取出来,若是key对应的null或者无效数据,就将其删除,若是正常的数据,就将其单独处理,再与正常处理的数据进行union操作.

  (3)提高shuffle操作reduce的并行度:将reduce task的数量变多,比如groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个参数。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。

  (4)对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。将原始的 key 转化为 key + 随机值(例如Random.nextInt),对数据进行操作后,将 key + 随机值 转成 key.

六、reduceByKey与groupByKey的区别:

  pairRdd.reduceByKey(_+_).collect.foreach(println)等价于pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)

  reduceByKey的结果:(hello,2)(world,3) groupByKey的结果:(hello,(1,1))(world,(1,1,1))

  使用reduceByKey()的时候,本地的数据先进行merge然后再传输到不同节点再进行merge,最终得到最终结果。而使用groupByKey()的时候,并不进行本地的merge,全部数据传出,得到全部数据后才会进行聚合成一个sequence.groupByKey()传输速度明显慢于reduceByKey()。虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是,优先使用reduceByKey(func).

七、Spark运行的全流程:

  (1)首先通过spark-submit提交Application应用,后台就会创建相应的driver进程,driver进程会运行Application中的代码,

  (2)初始化Sparkcontext,Sparkcontext是用户通向spark集群的入口,在初始化sparkContext时,同时的会初始化DAGScheduler、TaskScheduler,初始化TaskScheduler的同时,会创建两个非常重要的对象,分别是 DriverActor 和 ClientActor.

  (3)clientActor向master注册Application,master收到Application的注册请求后,会使用自己的资源调度算法,通知相应的worker节点为Application启动多个Executor.

  (4)多个Executor启动之后,会反向注册到DriverActor,之后driver结束sparkcontext的初始化,继续执行接下来的代码.

  (5)在接下来的代码中,将所遇到的对RDD的所有操作形成一个DAG有向无循环图,每执行到action操作就会创建一个job到DAGScheduler中,而job又根据RDD的依赖关系划分成多个stage,每个stage里根据最后一个RDD的分区数目来创建task,多个task形成一个taskset

  (6)将taskset送到taskscheduler中,然后taskscheduler对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的executor中运行.

  (7)executor接收到了DriverActor 发送过来的launchTask 时,会对launchTask 进行反序列化,封装到一个TaskRunner 中,然后从executor这个线程池中获取一个线程来执行指定的任务.

  (8)最终当所有的task任务完成之后,整个application执行完成,关闭sparkContext对象。

八、spark处理任务的过程:

  (1)构建DAG(有向无环图)(调用RDD上的方法)

  (2)DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中生成的Task以TaskSet的形式给TaskScheduler

九、Spark 的运行模式中有哪几种:

  (1)本地模式:driver和Executors处于同一个jvm

  (2)standalone模式:基本Spark内置的集群搭建模式,运行时要开起master和worker的守护进程.适合于不太依赖Hadoop的运算环境.

  (3)基于yarn-cluster模式:作业调度、资源调度由Yarn分配。Yarn在这方面做得比Spark standalone集群好,适用于存储计算合一,或者需要依赖MR、Hive等作业的场景,一般用于生产模式.在 Yarn-Cluster 模式中,当用户向 Yarn 中提交一个应用程序后, Yarn 将分两个阶段运行该应用程序:第一个阶段是把 Spark 的 Driver 作为一个 ApplicationMaster 在 Yarn 集群中先启动;第二个阶段是由 ApplicationMaster 创建应用程序,然后为它向 ResourceManager 申请资源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。(过程类似于mapreduce)

  (4)基于yarn-client模式,一般用来测试,传输消耗大,方便调试.

十、spark的shuffle过程:

未优化:  

  (1)每一个ShufflleMapTask会为每一个ReduceTask创建一个bucket缓存,并且会为每一个bucket创建一个文件。这个bucket存放的数据就是经过Partitioner操作(默认是HashPartitioner)之后找到对应的bucket然后放进去,最后将bucket缓存的数据刷新到磁盘上,即对应的block file.

  (2)然后ShuffleMapTask将输出作为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每一个MapStatus包含了每一个ResultTask要拉取的数据的位置和大小.

  (3)ResultTask然后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于自己的,然后底层通过BlockManager将数据拉取过来.

  (4)拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,然后ResulTask开始进行聚合,最后生成我们希望获取的那个MapPartitionRDD

  缺点:如上图所示:在这里有1个worker,2个executor,每一个executor运行2个ShuffleMapTask,有三个ReduceTask,所以总共就有4 * 3=12个bucket和12个bucket和12个block file。如果数据量较大,将会生成M*R个小文件,比如ShuffleMapTask有100个,ResultTask有100个,这就会产生100*100=10000个小文件

   bucket缓存很重要,需要将ShuffleMapTask所有数据都写入bucket,才会刷到磁盘,那么如果Map端数据过多,这就很容易造成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值之后,就会将数据一点一点的刷新到磁盘,但是这样磁盘I/O就多了.与MR完全不一样的是,MR它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。因为mr要实现默认的根据Key的排序!所以要排序肯定得写完所有数据,才能排序,然后reduce来拉取。但是spark不需要,spark默认情况下,是不会对数据进行排序的。因此shufflemaptask每写入一点数据,resulttask就可以拉取一点数据,然后再本地执行我们定义的聚合函数和算子.spark这种机制的好处在于速度比mr快多了.由于这种事实拉取的机制,一次提供不了直接处理key对应的valur的算子,只能通过reducebykey,先shuffle,有一个maptartitionsRDD,然后用map算子,来处理每个key对应的values.

优化后:

  (1)每一个Executor进程根据核数,决定Task的并发数量,比如executor核数是2,就是可以并发运行两个task,如果是一个则只能运行一个task,

  (2)假设executor核数是1,ShuffleMapTask数量是M,那么executor依然会根据ResultTask的数量R,创建R个bucket缓存,然后对key进行hash,数据进入不同的bucket中,每一个bucket对应着一个block file,用于刷新bucket缓存里的数据

  (3)然后下一个task运行的时候,那么不会再创建新的bucket和block file,而是复用之前的task已经创建好的bucket和block file。即所谓同一个Executor进程里所有Task都会把相同的key放入相同的bucket缓冲区中

这样的话,生成文件的数量就是(本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每一个Executor的shuffleMapTask数量100,ReduceTask数量为100,那么  未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化之后的数量是2*1*100 = 20

  缺点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

十一、spark的checkpoint操作:

  checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)

十二、spark的cache和persist的区别:

  计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中

  cache最终调用了persist方法,默认的存储级别仅是存储内存中的,persist有好几个存储级别,persist是最根本的底层函数,executor执行时,60%用来缓存RDD,40%用来存放数据.

十二、spark中transform与action操作的区别:

  transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD,action是得到一个值,或者一个结果(直接将RDD cache到内存中)。所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

十二、spark的RDD与DataFrame以及Dataset的区别:

  spark的基本数据结构:

  RDD是弹性分布式数据集,编译时类型安全,具有面向对象的风格RDD是一组表示数据的Java或Scala对象,但是序列化性能开销大,需要频繁非删除对象导致GC性能开销大.

  弹性:RDD的每个分区在spark节点上存储时默认是放在内存中的,若内存存储不下,则存储在磁盘中

  分布性:每个RDD中的数据可以处在不同的分区中,而分区可以处在不同的节点中.

  容错性:当一个RDD出现故障时,可以根据RDD之间的依赖关系来重新计算出发生故障的RDD.

  从以下方面是区别三者之间的关系:

  (1)数据的表示形式:RDD是数据元素的分布式集合,RDD是一组表示数据的Java或Scala对象;DataFrame是以列方式构成的分布式数据集合,类似于关系数据库中的表;Dataset是DataFrame API的扩展.

  (2)数据格式:RDD可以轻松有效地处理结构化和非结构化的数据,DataFrame仅适用于结构化和半结构化数据,Dataset可以有效地处理结构化和非结构化数据它表示行(row)的JVM对象或行对象集合形式的数据.

  (3)编译时类型安全:RDD提供了一种熟悉的面向对象编程风格,具有编译时类型安全性。DataFrame尝试访问表中不存在的列,则持编译错误仅在运行时检测属性错误,DataSet可以在编译时检查类型, 它提供编译时类型安全性。

  (4)性能开销:RDD:分发数据或者将数据写入磁盘时,会使用java序列化,序列化单个Java或者scala对象的开销较大,销毁单个对象时,会导致垃圾回收.

          DataFrame:可以将数据序列化为二进制的格式存储在堆外内存中,然后直接在内存中进行转换,无需使用java序列化来编码数据.避免在为数据集中的每一行构造单个对象时引起的垃圾回收。

          Dataset:在序列化数据时,它使用spark内部Tungsten二进制格式存储表格表示,因为序列化是通过Tungsten进行的,它使用了off heap()数据序列化,不需要垃圾回收器来摧毁对象

十三、spark广播变量及其原理:

  当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果使用广播变量在每个Executor端中只有一份Driver端的变量副本。广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义

  原理:实际上就是Executor端用到了driver端的变量,如果在executor端你使用到了driver端的广播变量,如果不使用广播变量,在每个executor中有多少task就有多少变量副本。使用了广播变量,实际上就是为了减少executor端的备份,最终减少executor端的内存。

十四、spark streaming从kafka中读数据的两种方式:

  Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据

  (1)receiver方式:Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

  然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

  注意:Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在1、KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

  (2)基于Direct方式:这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

  优点:简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

  高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
receiver与和direct的比较:

  (1)基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

  (2)基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

十五、spark的内存管理机制:

堆内内存:  

  作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

  堆内内存:堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory参数配置,一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。

  (1)execution内存是执行内存,文档中说join,aggregate都在这部分内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,能够减少IO。其实map过程也是在这个内存中执行的。

  (2)storage内存是存储broadcast,cache,persist数据的地方。

  (3)other内存是程序执行时预留给自己的内存。

堆外内存:

  Off-heap memory不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API (类似于malloc()函数)直接向操作系统申请内存由于这种方式不进过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。堆外内存只区分 Execution 内存和 Storage 内存.

  无论堆内和堆外内存目前 Execution 内存和 Storage 内存可以互相共享的。也就是说,如果 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 可以从 Storage 中申请空间;反之亦然.

 Spark中的OOM问题不外乎以下两种情况:

  (1)map执行中内存溢出如flatMap,filter,mapPatitions

    map端过程产生大量对象导致内存溢出:这种溢出的原因是在单个map中产生了大量的对象导致的针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。

  (2)shuffle后内存溢出如join,reduceByKey,repartition

    shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数.这个参数spark.default.parallelism只对HashPartitioner有效.如果是别的partitioner导致的shuffle内存溢出就需要重写partitioner代码了.

十四、task之间的内存分配:

  为了更好地使用使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存。当 Task 需要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用情况,如果没有,则将这个 Task 内存使用置为0,并且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。之后为这个 Task 申请 numBytes 内存,如果 Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,然后返回;如果当前 Execution 内存区域无法申请到每个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。每个 Task 可以使用 Execution 内存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。一个 Task 能够运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可以使用全部的 Execution 内存。比如如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 可以申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

猜你喜欢

转载自www.cnblogs.com/hdc520/p/12588379.html