1.Map端Join解决数据倾斜
1.Mapreduce中会将map输出的kv对,按照相同key分组(调用getPartition),然后分发给不同的reducetask
2.Map输出结果的时候调用了Partitioner组件(返回分区号),由它决定将数据放到哪个区中,默认的分组规
则为:根据key的hashcode%reducetask数来分发,源代码如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value,int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
3.所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个
CustomPartitioner继承抽象类:Partitioner,来返回一个分区编号
4.然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
5.自定义partition后,要根据自定义partitioner的逻辑设置相应数量的ReduceTask
存在的问题:如若Mapper输出的一些Key特别多,另一些Key特别少就会产生数据倾斜,造成一些Reducer特别忙
,一些则比较闲,我们说Mapper端相同key的输出数据会发到同一个Redurce端,需要把key相同的放在一起才能进行
拼接,所以才需要Reducer。如果我们不需要Reducer就能做拼接,就不存在数据倾斜了。
解决方案:Map端Join解决数据倾斜,我们为每一个MapTask准备一个表的全表。这种机制叫做Map Side Join。
当然这个表的全表不能很大
2.map端join算法实现: