大数据秋招面经之spark系列


文章目录


前言

spark是大数据面试重点,基本属于必须要熟练掌握的内容,考点要远多于hadoop。


提示:以下是本篇文章正文内容,下面案例可供参考

spark高频面试题汇总

1.spark介绍

  1. 基于内存,spark把中间数据放到内存,迭代效率高

  2. spark容错性高,spark引入弹性分布式数据集RDD,数据集一部分丢失,可根据血统进行重建。

  3. spark更加通用,算子更多

  4. 可用于处理复杂的批处理或者基于实时的流式处理(微批)

  5. spark组件:spark core,spark sql,spark streaming,MLib,GraphX

2.spark分组取TopN方案总结:方案2是最佳方案。

方案1:
相同的key为一组,然后groupBykey()后得到数据名叫grouped,然后对grouped进行mapValues就把一个个value取出来,然后将values转化为list调用sorted算子进行排序(默认是升序的,所以要自定义排序比较器,规则为降序)

grouped.mapValues(arr=>arr.toList.sorted( 
     new Ordering[(int,int)]{
    
    
        override def compare(x:(int,int),  y:(int,int) =y._2.compareTo(x._2))
                         }).take(n))         
        或者用sortBy(xxx,false)来降序

成本:发生了一次shuffle
问题:groupBykey,mapValues在使用时将大数据量放入内存时易发生oom。

方案2:取出所有key,对key进行迭代,每次取出一个key利用spark的排序算子进行排序。用combineByKey代替groupByKey可解决oom问题,它会尽量压缩内存中的数据。
combineByKey中的三个函数:
第一个函数:处理第一条记录,如果key原来未遇到过,就创建key对应的累加器的初始值。
第二个函数:处理第二条以及后续的记录的相关逻辑(其中有排序过程) ,如果key原来遇到过,使用该方法将key的累加器对应的当前值与新值合并。
第三个函数:将各个分区的结果合并

方案3:自定义分区器,按照key进行分区,不同的key进入到不同分区,对每个分区用spark的排序算子进行排序

3. repartition与coalesce

repartition的底层调用了coalesce,且repartition的其中一个入参是true:表示一定发生shuffle,会进行重新分区。
coalesce会不会发生shuffle与入参传入的参数是false还是true有关,true就会发生shuffle。
所以当有增加分区数需求时必会发生shuffle,可用repartiton,当减少分区数时刻用coalesce,避免发生shuffle,用IO使数据移动。

4. spark的oom问题怎么产生的以及解决方案

第一数据倾斜的话可以先局部聚合再全局聚合
第二某些算子可能造成oom,比如groupByKey

5.storm与flink,spark streaming之间的区别

  1. 容错性:
    spark依赖checkpoint storm使用ack机制,每次处理完一条data会发送一条ack消息给bolt,该条data被所有节点都处理后,它会收到所有节点的ack,容错开销很大。 flink使用异步分布式快照,类似checkpoint

  2. 吞吐量:spark各种优化做的很好,吞吐量最大,但是有状态的计算(updateStateByKey算子)需要通过额外的rdd来维护,开销较大,对吞吐量影响较大。storm的容错机制开销较大,对吞吐量影响很大。flink拥有图和调度的优化机制,使得吞吐量很大。

  3. 延迟:spark是基于微批实现的,但是付出了延迟的代价,延迟是秒级的,storm是native streaming实现的,可以达到几十毫秒的延迟,延迟最低。flink也是native streaming实现的,可以达到百毫秒级别,flink的强大在于优秀的内存管理和流控,性能出色,但是不够成熟,如sql的支持比较初级,无法像storm那样在不停止任务的情况下动态调整资源。

6.spark的几种部署方式:

     (1)local:运行在一台机器上,通常是练手或测试环境:如在IDEA中运行scala程序  
          local:所有计算在一个线程完成;local[n]:所有计算在n个线程完成; local[*]:按照cpu最多核数来设置线程数
     (2)Standalone:构建一个master+slaver的资源调度集群。
     (3)spark yarn-client:spark客户端直连yarn,不需要搭建spark集群
     (4)spark yarn-cluster:spark客户端直连yarn,不需要搭建spark集群
     (5)spark on mesos
     (6)spark on k8s
    本质区别:资源管理者不同

7.复习spark的yarn-cluster模式执行流程:

 1.客户端向yarn提交应用程序(提交任务)
 2.rescourceManager收到请求后,在集群中选择一个nodeManager,让它启动applicationMaster
 3.application向resourceManager申请executor
 4.rescourceManager通知其它nodeManager启动executor
 5.nodeManager启动了executor后,executor向applicationMaster反向注册
 6.applicationMaster知道了有哪些executor可用,然后就去执行job,拆分stage,提交stage的task,进行task调度,分配到各个executor执行。

8.spark的job提交流程:

1.sparkContext向DAGScheduler提交一个Job后,会创建一个JobWaiter对象,用于阻塞当前线程,等待当前Job的执行结果,因此在一个线程中,Job是顺序执行的
2.DAGScheduler会根据RDD的依赖关系将一个Job划分为若干个Stage。
3.DAGScheduler在提交Stage时,会根据分区的信息生成对应的Task,打包成TaskSet,提交给TaskScheduler。而TaskScheduler收到后,会将TaskSet封装成TaskSetManager,丢到任务队列中等待执行
4.SchedulerBackend负责Executor的状态与资源管理,当发现有空闲资源时,就通过TaskScheduler从任务队列中取出相应的TaskSetManager去调度执行
5.TaskSetManager中的Task最终会分发到不同的Executor的线程中去执行。

9.spark中假设一个application中有多个job并行执行,怎么调度

spark的逻辑是很复杂的,每个程序运行的job,stage数量不一,需要很多细致的分析才能算出,灵活度很高,默认是抢占资源,任务是线程级别;task以线程的形式运行在Executor,不同的executor间数据交互会有shuffle,executor与datanode交互叫IO,无shuffle。
每个application(程序)都会先给自己申请很多executor,无论负责资源调度的在yarn,在k8s还是其他地方。
重点如下:
spark是以TaskSetManager为单元进行调度任务,通常情况下,任务队列中只有一个TaskSetManager,而提交多个job时,则会有多个TaskSetManager被丢到任务队列中,当有资源空闲的时候,谁会从队列中被取出来执行就看相应的调度策略了。
一般spark支持FIFO和FAIR两种调度策略, 默认是FIFO,假设cpu有12个核,假设TaskSetManager A,B中分别有18,10个Task需要执行,只有A中先执行完前12个Task,在执行后6个Task时,空闲的6个核才会分给B。若某一时刻A,B各有6个Task,则可以并行进行。
通过参数spark.scheduler.model可以设定策略是FAIR,该方式让多个TaskSetManager都有机会进行,但是需要配合FAIR Pool来使用,默认情况TaskSetManager会被全部丢到一个默认的Pool中,此时调度效果同FIFO。spark支持在线程中设定自己的FAIR Pool,从而将该线程提交的TaskSetManager丢到指定的Pool中。多个FAIR Pool会被轮询执行,执行权可以预先设定。 通过这种方式,可以让所有的TaskSetManager都有机会被调度。而不会被先进到队列中的需要长时间运行的其他任务阻塞住。
总结:
spark支持通过多个线程在一个sparkContext上提交多个Job,每个线程里边的Job是顺序执行的,但是不同的线程的Job是可以并行执行的,取决于当时的Executor中是否有充足的cpu核数,任务队列中的TaskSetManager是有序执行还是轮询执行取决于调度策略。

10.sparkstreaming的底层实现原理:

重点:InputDStream管理流的数据源的通用抽象类;JobGenerator 作业生成器;JobSchedule 作业调度器;DStreamGraph管理创建的所有InputDStream的初始化和启动。
JobGenerator 作业生成器:基于Spark Streaming会产生一系列DStream,基于DStreams生成Jobs,
JobGenerator每隔我们设定的时间间隔会生成一个JobGeneratorEvent事件用于触发一个作业。
DStreamGraph的generateJobs()方法用来生成多个job,通过JobScheduler对这些Job进行提交。
JobScheduler将这些job对象提交到一个线程池JobExecutor之后,其会在线程池内执行这个Job对象内的闭包逻辑,将其转换成分布式计算的Task分发到不同的节点去执行。

11.join操作会不会触发spark shuffle

不一定;对于所有参与cogroup的rdd,如果它的partitoner和结果CoGroupedRDD的partition相同,则该rdd会成为CoGroupedRDD的一个oneToOne的窄依赖,不会发生shuffle。
所以,不会发生shuffle的前提是参与join的所有rdd的partitoner与结果rdd的partitioner相同
partitioner的策略:
1.如果父rdd中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner
2.如果没有,则根据默认分区数生成HashPartitioner.
CoGroupedRDD的数据是根据不同的依赖从父RDD中获取:
1.对应窄依赖,直接调用父RDD的iterator方法获取对应partiton的数据
2.对于宽依赖,从shuffleManager获取shuffleReader进行获取,即发生了shuffle reader

12.spark join的几种形式:

(1)broadcast join:将小表数据加载到内存,分发到每个节点上,与大表进行比对,以牺牲空间的方式避免大量的shuffle
(2)shuffle hash join:通过分区的形式将大批量的数据划分为n份较小的数据进行并行计算,利用key相同必然分区相同的原理。spark sql将较大表的join分而治之,先将表划分为n个分区,再对两个表中相对应分区的数据分别进行hash join。
shuffle hash join分为两步:

  • 对两张表分别按照join keys进行分区,即shuffle,目的是让相同的join keys值的记录分到对应的分区

  • 对对应分区中的数据进行join,此处先将小表分区构造成一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配。

(3)sort merge join:大表和大表之间的sort merge join,这种实现方式不用将一侧数据全部加载后再进行join,但是需要在join前将数据排序。
首先将两张表按照join key进行了重新shuffle,保证了 join keys值相同的记录会被分到相同的分区。分区后对每个分区内的数据进行排序,排序后对相应的分区内的记录进行连接
因为两个序列都是有序的,从头遍历,碰到相同的key就输出,如果不同,左边小就继续取左边,反之取右边。
可以看出来,无论分区有多大,sort merge join都不用把某一侧的数据全部加载到内存,而是即用即丢,大大提高了sql join的稳定性。

13.mapreduce的counter:

 计数器用来记录Job的执行进度和状态,它的作用可理解为日志。我们可以在程序的某个位置插入计数器,记录数据的变化情况。
 counter为我们提供了一个窗口,用来观察mapreduce job运行时期的各种细节数据,对mapreduce的性能调优很有帮助。
 mapreduce自带了很多默认的counter。如输入的字节数,输出的字节数,map端输入/输出的字节数和条数等
 计算器能够被全局的聚集
 map_input_records:map输入的记录数
 map_input_bytes:map输入的字节数

14.spark数据保存到hdfs的方式:

 rdd.saveAsTextFile(path,classOf[com.hadoop.compression.lzo.LzopCodec])

15.spark streaming性能调优:

 1.数据接收的并行度调优:
    每个InputDStream都会在某个worker上的Executor上启动一个Receiver,该Receiver接收一个数据流,可通过创建多个InputStream达到并行读取。
    可让让他们分别接收数据源的不同分区。
  2.调整block的时间间隔 
     可通过调整spark.streaming.blockInterval参数来设置,默认200ms.这个参数也会影响分区数
     若机器的cpu core的数量大于每个batch的task数量,则可适当减小blockInterval
  3.适当调大task数量
     通过调整spark.default.parallelism参数来决定。
  4.task序列化
     使用kryo序列化类库来序列化task,可减少task大小从而减少driver发送到这些task到各个executor的发送时间,即节省网络资源。
  5.内存调优
     不想因JVM垃圾回收导致长时间延迟,用Kryo序列化机制可进一步减少内存和GC开销
     使用并行的标记清除垃圾回收机制,用来保持GC低开销。虽然并行的GC会降低吞吐量,但还建议使用,因为可减少batch的处理时间
    如果使用,要在driver和executor端都开启。
    -XX:+UseConcMarkSweepGC

16.spark哪些算子会发生shuffle

 1.repartition操作:repartition,coalesce
 2.ByKey操作:reduceByKey,groupByKey,sortByKey,sortBy等
 3.join操作:join,cogroup
 4.去重操作:distinct
 5.集合操作:交集intersection;差集subtract

17.spark的partitioner有几种

hashPartitioner(默认)
rangePartitioner:根据key值范围和分区数确定范围,将范围内的键分配给相应的分区。将一定范围内的数据映射到某一个分区,主要用于RDD的数据排序相关的API,比如sortByKey()底层用的数据分区器就是RangePartitoner,先从整个RDD中进行数据抽样,将样本排序,计算出每个分区的最大key形成一个数据范围数组;再判断key在数组中的范围,给出key在下一个RDD中分区id下标

18.spark几个重要的调优参数:

  1. hive开启数据倾斜时负载均衡:set hive.groupby.skewindata=true 底层原理:先随机分发并处理,再按照key group by来分发处理,生成的查询计划会有连个MR job

  2. 控制spark reduce缓存,调优shuffle:spark.reducer.maxSizeInFight 此参数为reduce task能够拉取多少数据量,默认48M,当集群资源充足的时候可调大这个值如96M。
    spark.shuffle.file.buffer:每个shuffle文件输出流的内存缓冲区大小,调大此参数可减少在创建shuffle文件时进行磁盘搜索和系统调用的次数,默认是32k,一般可调到64k。

  3. sparkstreaming流的速度及背压机制:
    通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数
    spark.streaming.backpressure.enabled=true;开启背压机制后Spark Streaming会根据延迟动态去kafka消费数据。

  4. Receiver生成Block的间隔:spark.streaming.blockInterval,这个参数也会影响分区数,可减小它适当提高task的并行度

  5. spark.executor.memory:executor内存大小

  6. spark.serializer:默认是JavaSerializer可用KryoSerialize

19.spark内存模型:

按位置分为堆内内存和堆外内存;
按功能划分方式:存储内存;执行内存;剩余内存
作为一个JVM进程,Executor的内存管理建立在JVM内存管理之上,Spark对JVM的堆内空间进行了更为详细的分配,以充分利用内存。同时Spark引入了堆外内存,使之可直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
Executor内运行的并发任务共享JVM堆内内存,这些任务在缓存RDD数据和广播数据时占用的内存是存储内存,这些任务在执行shuffle时占用的内存是执行内存Spark内部的对象实例占用剩余空间。
JVM的对象可以以序列化的方式存储,序列化的过程中将对象转换成为二进制字节流,本质上可理解为将非连续空间的链式存储转化为连续空间或者块存储。
在访问时则需要进行反序列化,将字节流转化为对象,序列化节省了空间,增加了存储和读取时候的计算开销。
Spark中序列化后的字节流占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是周期性估算得到的,并不是每次新增数据项都会计算一次内存,这种方法降低了时间开销但有可能误差较大,导致某时刻的内存可能远超预期,此外被标记的应被释放的对象并不一定会被JVM回收,所以spark的痛点之一就是无法完全规避oom。
虽然无法精准的控制堆内存的申请和释放,但是spark通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存RDD,在一定程度提高了内存的利用率,减少异常出现。
堆外内存:可进一步优化内存使用,提高shuffle时排序的效率。直接在工作节点的系统内存中开辟空间,存储序列化后的二进制数据。减少了不必要的内存开销,减少了GC,提升了性能。
堆外内存可被精确地申请和释放,默认堆外内存并不开启,堆外内存的划分方式同堆内内存。
存储内存和执行内存的管理者是MemoryManager,同一个Executor中的任务都调用这个接口的方法申请和释放内存。

20.Spark sql执行流程:

sql文本–>解析器–>分析器–>优化器–>转换器
sql语句解析,生成抽象语法树;
抽象语法树通过Analyzer模块借助CataLog中的表信息解析为逻辑计划Logical Plan;
优化器再通过各种规则的优化策略进行优化,得到优化后的逻辑计划Optimized Logical Plan;
优化后的逻辑计划经转换器转成物理计划Physical Plan。然后形成spark RDD,经DAGScheduler进行任务调度,stage划分,形成一个个task,分配到executor去执行。

21.DataFrame,DataSet,RDD的区别:

RDD是分布式Java对象的集合,DataFrame是分布式Row类型集合。
RDD优点:面向对象,编译时类型安全;
缺点: RDD强调不变性,倾向于创建新对象而不是修改老对象。对GC造成压力,需频繁进行序列化与反序列化。
DataFrame提供了更丰富的算子,更提高了执行效率,减少了数据读取,减少了IO以及优化了执行计划,如filter下推,裁剪等。
DataFrame引入了表结构和堆外内存,只需要序列化与反序列化数据,表结构不用。
DataSet结合了RDD和DataFrame的优点,序列化数据时,Encoder产生的字节码与堆外内存进行交互,而不用反序列化整个对象;DataSet编译时类型安全,面向对象
RDD+表结构–>DataFrame;DataFrame+类+属性–>DataSet

22.hive on spark与spark on hive的区别:

 hive on spark--->hive sql提交到spark上运行,hive用来解析sql,spark是计算引擎。
 spark on hive --->spark通过hive提供的meta服务能够访问hive中的数据,这时hive主要做存储。

23.spark框架:

sparkCore—>sparkContext—>RDD
sparkstreaming—>streamingContext—>Dstream
sparkSQL–>sparkSession—>df/ds

24.sparkstreaming的流join原理:

包括静态数据与流的join;半静态数据与流的join;流与流的join(麻烦点:流的错峰问题)
如果静态数据比较大,流join解决方案:可以使用RDD的缓存以及持久化机制。

25.如何解决数据积压问题:

先看哪个环节出现数据积压,如果是数据入库mysql积压,则要高效入库;如果是某一个 算子操作业务太多造成积压,可以将一个算子拆成多个;如果是资源不够,可使用动态资源划分。

26.sparkstreaming消费完kafka数据之后如何高效入库?

在这种情况下,数据库很有可能是瓶颈,入库时分批插入(addBatch())而不是一条一条;使用foreachPartition每个分区创建一个连接,效率更高,且连接在executor里面创建不存在序列化的问题。

27.structStreaming和sparkstreaming的offset

structStreaming是做了强制的checkpoint,将offset保存到了checkpoint里面去。
sparkstreaming没有将offset保存到checkpoint,checkpoint更多的保存中间状态。

28.怎么在一个sparkstreaming里同时消费不同的topic然后根据不同的topic里面的内容处理不同的业务逻辑?

使用正则表达式来匹配不同的topic, ConsumerStrategies.SubscribePattern()或者ConsumerStrategies.Subscribe()里传入多个topic然后正则匹配

29.flatmap能不能代替filter实现过滤功能?

可以,flatmap是先map再flat,flatmap里传入的参数的返回值是iterator迭代器类型,如果迭代器里没有任何元素,那么flatmap就flat不出来,就相当于filter了。如果迭代器里是n条数据,就实现了1到n的过程,所以flatmap可实现1对n,也可1对1。

30.SparkContext.textFile()得到的RDD类型是:

HadoopRDD,HadoopRDD是一个抽象类,通过HadoopRDD读到数据。且hadoopRDD本身是不存数据的,它通过
compute()方法里拿到一个对数据源文件的迭代器,运用到了迭代器模式。

31.hadoopRDD面向文件时分区的个数怎么来的:

通过输入格式化类InputFormat;通过InputFormat.getSplit()方法可得到该文件有多少个切片,多少个切片一般对应多少个分区。

32.HadoopRDD如何读取文件的

HadoopRDD的compute()方法实现也依赖于输入格式化类Inputformat,Inputformat.recordreader是一个记录读取器,
记录读取器通过next(),hasNext()方法包装了一个迭代器,完成对文件的读取。

33.笛卡儿积cartesian不会发生shuffle原因是:

如果数据不需要区分每一条记录归属哪个分区,间接的说明这样的数据不需要partitioner,即不需要shuffle。
因为shuffle的语义是洗牌,即面向每一条记录计算它的分区号,这一过程必然涉及partitioner过程。如果有行为不需要区分记录,只需本地IO拉取数据,那么这种直接IO一定比先partition计算分区号再shuffle落文件,最后再通过IO拉取数据快。

34.mapVaules,flatMapValues比map和flatMap的优势

mapVaules,flatMapValues用于k-v类型的数据,且key未发生变化,分区器和分区数都没发生变化,在多次shuffle的过程中mapVaules,flatMapValues能减少一次shuffle过程。
因为若前后两个stage的分区数和分区器都没有发生变化,在它的底层做了优化,直接用上一个shuffle的分区结果,避免了一次重复且不必要的shuffle过程。
而map和flatmap则会抛弃上次的分区结果,重新计算分区,这样相比mapVaules,flatMapValues多了一次shuffle过程。


猜你喜欢

转载自blog.csdn.net/wq17629260466/article/details/108903141