我理解的shuffle

HDFS里的文件是分块存放在Datanode上面的,mapper程序也是跑在各个节点上的。这里就涉及到一个问题,哪一个节点上的mapper读哪一些节点上的文件块呢?

hadoop会自动将这个文件分片(split),得到好多split,这每一个split放到一个节点的一个mapper里面去读。然后在每一台有mapper任务的节点上都执行了这么一个操作,将分得到的split切割成一行一行的键值对,然后传给map方法。键是这每一行在split中的偏移量,值是每一行得到的字符串。

shuffle阶段之前的数据<偏移量,  "读取的一行数据">

Shuffle过程

关于shuffle过程,可以划分为俩个阶段,一个在map阶段,另一个在reduce阶段。

  • map阶段的shuffle:
  1. 写入之前先进行分区Partition,用户可以自定义分区(继承Partitioner类,重写分区方法)。

  2. 如果指定分区,框架会使用 默认的分区(HashPartitioner)对key去hash值之后,然后在对reduceTaskNum进行取模(雨露均沾),然后决定由那个reduceTask来处理。

  3. 将分完区的结果<key,value,partition>开始序列化成字节数组,开始写入缓冲区。

随着map端的结果不端的输入缓冲区,缓冲区里的数据越来越多,缓冲区的默认大小是100M,当缓冲区大小达到阀值时
默认是0.8(80M),开始启动溢写线程,锁定这80M的内存执行溢写过程,内存—>磁盘,此时map输出的结果继续由另一个线程往剩余的20M里写,两个线程相互独立,彼此互不干扰。

  1. spill线程启动后,开始对key进行排序(Sort)默认的是自然排序,也是对序列化的字节数组进行排序(先对分区号排序,然后在对key进行排序)。

  2. 如果实现combine,本节点内,将key相同的value合并,这样的好处就是减少溢写到磁盘的数据量(如果数据多的话,节省好多i/o资源 )

     <key, list[1,1,1]>  => <key, list[3]>            #combine前 => 后结果变化
    

每次溢写都会在磁盘上生成一个一个的小文件,因为最终的结果文件只有一个,所以需要将这些溢写文件归并到一起,这个过程叫做Merge,生成文件大概是这种<key, list[1,2,1]>集合里面的值是从不同的溢写文件中读取来的。

简单来说:parttion -> sort -> combine

  • reduce阶段的shuffle:
  1. partition:当MapTask完成任务数超过总数的5%后,开始调度执行ReduceTask任务,然后ReduceTask默认启动5个copy线程到完成的MapTask任务节点上分别copy一份属于自己的数据(使用Http的方式)。hadoop决定有多少个reducer的时候会规定有多少个partition,每一个reducer拉取自己要处理的那个分组的全部成员。

  2. 数据会首先保存到内存缓冲区中,当达到一定的阀值的时候,开始启动内存到磁盘的Merge,也就是溢写过程,一直运行到map端没有数据生成,最后启动磁盘到磁盘的Merge方式生成最终的那个文件。在溢写过程中,然后锁定80M的数据,然后在延续Sort过程,然后进行group(分组)将相同的key放到一个集合中(参考combine)。

  3. 在每一个reducer上,将具有相同键的键值对生成另外一个新的键值对,键是以前的键,键值是一个以前键值的集合。

  4. sort: 在每一台reducer节点上,根据哈希码将新生成的键值对进行排序(reducer运行几个节点结果就会生成几个文件)。

简要概括:partition -> merge->sort

优化
一、增加combine阶段以及对输出进行压缩进行map调优。

  1. combine合并: 实现自定义combine要求继承reducer类。比较适合map的输出是数值型的,方便进行统计。

  2. 压缩:在提交job的时候分别设置启动压缩和指定压缩方式。

二、 reducer调优 reducer调优主要是通过参数调优和设置reducer的个数来完成。

使用场景:map的执行process数是通过inputformat返回recordread来定义的;
而reducer是有三部分构成的,分别为读取mapper输出数据、合并所有输出数据以及reduce处理,其中第一步要依赖map的执行,所以在数据量比较大的情况下,一个reducer无法满足性能要求的情况下,我们可以通过调高reducer的个数来解决该问题。

优点:充分利用集群的优势。

缺点:有些mr程序没法利用多reducer的优点,比如获取top n。

reduce阶段内存溢出解决

原因 : 单个reduce任务处理的数据量过多。
方法 :通过增大reducetasks数目、优化partition 规则使数据分布均匀进行解决。

猜你喜欢

转载自blog.csdn.net/qq_42898087/article/details/85268892