控制Shuffle
shuffle的工作主要是将中间结果分发到Reducer上,分发的依旧是中间结果的分区(partition),也就是说同一个分区的中间结果会交由一个Reduce任务处理,而进行分区操作的是由org.apache.hadoop.mapreduce.Partitioner
的子类完成。
Partitioner
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key the key to be partioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
key,value: map函数的输出结果
numPartition: Reducer的个数
返回值: 分区的依据,返回值相同的结果进入同一个分区
- Hadoop提供了两个Partition的子类:HashPartitioner和TotalOrderPartitioner类.Hadoop会默认使用HashPartition:
- HashPartitioner采用的是基于Hash值的分片方法
- 而TotalOrderPartition采用的是基于区间的分片方法
/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定义partitioner,按value的值分区,如果值大于10000,为第一个分区,如果值小于10000,为第二个分区
package cn.chen.hd.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
return (new Boolean(value.get() > 10000).hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}