全排序

问题的提出
正常情况下,Mapreduce的保障之一就是送到Reducer端的数据总是根据Reducer的输入键进行排序的,如果我们使用单个Reducer,排序就会直接了当,但是只是使用一个Reducer的情况少之又少,如果使用了多个Reducer,那么就只可能会保证每一个Reducer内的内容是会根据键进行排序的,而不会保证Reducder之间也是有序的,就会出现下面这种情况: 
reducer1:


全排序的问题解决
全排序的技巧包含在Partitioner的实现中,我们需要将键的取值范围转换为一个索引(0-25),例如这里的键就是所有的英文单词,不过我们需要得出划分几个索引范围,然后这些索引分配给相应的reducer


解决
这里假如我们可以分配的reducer的数量是2,那么我们就可以直接将(0-12)分配给第一个reducer,将(13-25)分配给另一个reducer 
注意这里我们是只根据第一个字母进行索引化分的情况,但是假如我们现在有30个reducer,我们如果还是只根据首字母确定索引的取值范围就会有点问题,会造成有4个reduce被浪费掉了,此时我们就需要重新确定索引的范围以及索引的计算方式,例如我们可以使用0+26的0次方+26的0次方+0表示aa,1+26的0次方+26的0次方+0表示ab,依次类推,然后将前30分支一的索引的范围分配给reducer0,接着的30分之一分配给reducer1,等等,如果不能整除的话,我们可以让剩下多的交给最后一个reducer,但是这不是最好的方案,因为这样可能会造成最后一个reducer被分配到的数据过多,影响这个task的性能,最好的做法应该是: 
假如我们现在的索引的范围是(0,82),分配给30个reducer,那么每一个费配到的应该是83/30=2个,按照最原始的想法,那么reducer29需要处理的是(59,82)这么大的索引范围内的数据,这显然是不科学的,我们需要将后面没有分配到的22个一次再分配给reducer0到reducer21


接下来的一个问题是: 
我们可能需要动态的指定reducer的输入键的索引的范围,这里我们需要将我们的partitioner实现Configurable接口,因为在初始化的过程中,hadoop框架就会加载我们自定义的Partitioner实例,当hadoop框架通过反射机制实例化这个类的时候,它就会检查这个类型是不是Configurable实例,如果是的话,就会调用setConf,将作业的Configuration对象设置过来,我们就可以在Partitioner中获取到配置的变量了


java代码的实现
public class GlobalSort {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();


        configuration.set("key.indexRange","26");


        Job job = Job.getInstance(configuration);
        job.setNumReduceTasks(2);
        job.setJarByClass(GlobalSort.class);
        job.setMapperClass(GlobalSortMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setReducerClass(GlobalSortReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);


        job.setPartitionerClass(GlobalSortPartitioner.class);


        FileInputFormat.setInputPaths(job,new Path("F:\\wc\\input"));
        FileOutputFormat.setOutputPath(job,new Path("F:\\wc\\output"));
        job.waitForCompletion(true);
    }
}


class GlobalSortMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //value是获取的一行的数据的内容,此处可以split
        String[] splits = value.toString().split(" ");
        for(String str : splits){
            context.write(new Text(str), new LongWritable(1L));
        }
    }
}




class GlobalSortPartitioner  extends Partitioner<Text,LongWritable> implements Configurable {


    private Configuration configuration = null;
    private int indexRange = 0;


    public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
        //假如取值范围等于26的话,那么就意味着只需要根据第一个字母来划分索引
        int index = 0;
        if(indexRange==26){
            index = text.toString().toCharArray()[0]-'a';
        }else if(indexRange == 26*26 ){
            //这里就是需要根据前两个字母进行划分索引了
            char[] chars = text.toString().toCharArray();
            if (chars.length==1){
                index = (chars[0]-'a')*26;
            }
            index = (chars[0]-'a')*26+(chars[1]-'a');
        }
        int perReducerCount = indexRange/numPartitions;
        if(indexRange<numPartitions){
            return numPartitions;
        }


        for(int i = 0;i<numPartitions;i++){
            int min = i*perReducerCount;
            int max = (i+1)*perReducerCount-1;
            if(index>=min && index<=max){
                return i;
            }
        }
        //这里我们采用的是第一种不太科学的方法
        return numPartitions-1;


    }


    public void setConf(Configuration conf) {
        this.configuration = conf;
        indexRange = configuration.getInt("key.indexRange",26*26);
    }


    public Configuration getConf() {
        return configuration;
    }
}


class GlobalSortReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for(LongWritable value : values){
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }
}

输出的结果:


part-r-00000
a   1
abc 1
he  1
hello   2
helo    1
jyw 1
lq  1
m   1
mo  1


part-r-00001
n   1
no  1
za  1
zz  1

怎么确定索引的范围
列出键的所有的可能取的值,这个种类就是索引的个数 
如果键的可能的取值是无穷尽的,那么就应该像本例一样,寻找出键的某一部分的所有的可能的取值(在排序山是不同的)


总结
step1:在WritableComparable键中实现排序逻辑,或者写一个自定义的Comparator,实现compareTo方法,实现排序的比大小的任务 
step2:定义一个方法将Reducer实例转换为一个索引值 
step3:实现一个自定义的Partitioner 
应该清楚整个Reducer键的索引范围 
利用键的索引将实例分配给相应的Reducer

猜你喜欢

转载自blog.csdn.net/wshsdm/article/details/80532320