描述
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。
输入数据
1 -12 2 -20 3 10 4 -5 5 15 1 20 2 30 3 40 4 25 5 30 1 40 2 41 3 42 4 43 5 44 1 -30 2 -31 3 -32 4 -33 5 -34 1 61 2 62 3 63 4 64 5 65我们期望的结果:
1 61,40,20,-12,-30 2 62,41,30,-20,-31 3 63,42,40,10,-32 4 64,43,25,-5,-33 5 65,44,30,15,-34
流程:
提高集群的处理能力,我们要设置多个reduce任务。如果文件很大默认是一个分片启动一个map或者一个文件启动一个map,如果我们设置了reduce的任务个数是3,map节点在处理文件或者分片的时候会利用分区函数对key进行分区处理,默认是利用key的hash值进行分区。reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据,reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)。这个时候利用分组函数进行分组处理,同一组的key-value会由同一个reduce函数处理,也就是说在reduce节点上有好几个reduce函数同时在处理任务。我们自定义了一个类是按照key递增value降序,遍历同一组的key输出就能得到结果。
使用System.out.println()打印一下流程,方便理解:
mapper 实例化 组合key实例化//(我们自定义key这个类) 按年分区partition实例化... start map function ...//(调用一次map方法) '组合key':[年份:1,温度:-12];分区:1 start map function ...//第二次调用,以此类推,一行一行读取文件,放入分区) '组合key':[年份:2,温度:-20];分区:2 start map function ... '组合key':[年份:3,温度:10];分区:0 start map function ... '组合key':[年份:4,温度:-5];分区:1 start map function ... '组合key':[年份:5,温度:15];分区:2 start map function ... '组合key':[年份:1,温度:20];分区:1 start map function ... '组合key':[年份:2,温度:30];分区:2 start map function ... '组合key':[年份:3,温度:40];分区:0 start map function ... '组合key':[年份:4,温度:25];分区:1 start map function ... '组合key':[年份:5,温度:30];分区:2 start map function ... '组合key':[年份:1,温度:40];分区:1 start map function ... '组合key':[年份:2,温度:41];分区:2 start map function ... '组合key':[年份:3,温度:42];分区:0 start map function ... '组合key':[年份:4,温度:43];分区:1 start map function ... '组合key':[年份:5,温度:44];分区:2 start map function ... '组合key':[年份:1,温度:-30];分区:1 start map function ... '组合key':[年份:2,温度:-31];分区:2 start map function ... '组合key':[年份:3,温度:-32];分区:0 start map function ... '组合key':[年份:4,温度:-33];分区:1 start map function ... '组合key':[年份:5,温度:-34];分区:2 start map function ... '组合key':[年份:1,温度:61];分区:1 start map function ... '组合key':[年份:2,温度:62];分区:2 start map function ... '组合key':[年份:3,温度:63];分区:0 start map function ... '组合key':[年份:4,温度:64];分区:1 start map function ... '组合key':[年份:5,温度:65];分区:2
组合key实例化 按年分组排序器comparator实例化 reducer 实例化0,//(我们设置了3个reducer,这是第一个,分区0中的数据发往这个reducer) 18/06/09 22:43:30 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords ---比较‘组合key’进行分组 k1:[年份:3温度:63],k2:[年份:3温度:42] 开始执行reduce function... ---比较‘组合key’进行分组 k1:[年份:3温度:42],k2:[年份:3温度:40] ---比较‘组合key’进行分组 k1:[年份:3温度:40],k2:[年份:3温度:10] ---比较‘组合key’进行分组 k1:[年份:3温度:10],k2:[年份:3温度:-32] reduce result:63,42,40,10,-32
组合key实例化 按年分组排序器comparator实例化 reducer 实例化1,//(我们设置了3个reducer,这是第二个,分区1中的数据发往这个reducer) ---比较‘组合key’进行分组 k1:[年份:1温度:61],k2:[年份:1温度:40] 开始执行reduce function...//(这个分区有年份1和4的数据,根据我们自定义分组函数,相同组的数据在一个reduce functiion中执行) ---比较‘组合key’进行分组 k1:[年份:1温度:40],k2:[年份:1温度:20] ---比较‘组合key’进行分组 k1:[年份:1温度:20],k2:[年份:1温度:-12] ---比较‘组合key’进行分组 k1:[年份:1温度:-12],k2:[年份:1温度:-30] ---比较‘组合key’进行分组 k1:[年份:1温度:-30],k2:[年份:4温度:64] reduce result:61,40,20,-12,-30 ---比较‘组合key’进行分组 k1:[年份:4温度:64],k2:[年份:4温度:43] 开始执行reduce function... ---比较‘组合key’进行分组 k1:[年份:4温度:43],k2:[年份:4温度:25] ---比较‘组合key’进行分组 k1:[年份:4温度:25],k2:[年份:4温度:-5] ---比较‘组合key’进行分组 k1:[年份:4温度:-5],k2:[年份:4温度:-33] reduce result:64,43,25,-5,-33
组合key实例化 按年分组排序器comparator实例化 reducer 实例化2 ---比较‘组合key’进行分组 k1:[年份:2温度:62],k2:[年份:2温度:41] 开始执行reduce function... ---比较‘组合key’进行分组 k1:[年份:2温度:41],k2:[年份:2温度:30] ---比较‘组合key’进行分组 k1:[年份:2温度:30],k2:[年份:2温度:-20] ---比较‘组合key’进行分组 k1:[年份:2温度:-20],k2:[年份:2温度:-31] ---比较‘组合key’进行分组 k1:[年份:2温度:-31],k2:[年份:5温度:65] reduce result:62,41,30,-20,-31 ---比较‘组合key’进行分组 k1:[年份:5温度:65],k2:[年份:5温度:44] 开始执行reduce function... ---比较‘组合key’进行分组 k1:[年份:5温度:44],k2:[年份:5温度:30] ---比较‘组合key’进行分组 k1:[年份:5温度:30],k2:[年份:5温度:15] ---比较‘组合key’进行分组 k1:[年份:5温度:15],k2:[年份:5温度:-34] reduce result:65,44,30,15,-34
自定义年、温度组合key类
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 年份温度组合类 * 用作map的输出key * 实现序列化、 compare比较接口 */ public class ComboKeyByYearTemp implements WritableComparable<ComboKeyByYearTemp> { private int year ; private int temp ; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } /** * 对key进行比较实现 */ public int compareTo(ComboKeyByYearTemp o) { int y0 = o.getYear(); int t0 = o.getTemp() ; //年份相同(升序) if(year == y0){ //气温降序 return -(temp - t0) ; } else{ return year - y0 ; } } /** * 串行化过程 */ public void write(DataOutput out) throws IOException { //年份 out.writeInt(year); //气温 out.writeInt(temp); } public void readFields(DataInput in) throws IOException { year = in.readInt(); temp = in.readInt(); } }
按年份key进行分区
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 按照年份进行分区 * 相同的年份到同一分区 */ public class PartionByYear extends Partitioner<ComboKeyByYearTemp,NullWritable> { @Override public int getPartition(ComboKeyByYearTemp comboKeyByYearTemp, NullWritable nullWritable, int i) { return comboKeyByYearTemp.getYear() % i; } }
自定义组合类comboKeyByYearTemp的排序比较器
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义key的排序比较器,调用组合key类的比较方法 */ public class SortComparatorByComboKey extends WritableComparator { public SortComparatorByComboKey() { super(ComboKeyByYearTemp.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { ComboKeyByYearTemp k1 = (ComboKeyByYearTemp) a; ComboKeyByYearTemp k2 = (ComboKeyByYearTemp) b; return k1.compareTo(k2); } }
年份递增分组排序
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 根据年份进行分组 * 年份递增 */ public class GroupByYear extends WritableComparator { protected GroupByYear() { super(ComboKeyByYearTemp.class, true); } public int compare(WritableComparable a, WritableComparable b) { ComboKeyByYearTemp k1 = (ComboKeyByYearTemp)a ; ComboKeyByYearTemp k2 = (ComboKeyByYearTemp)b ; return k1.getYear() - k2.getYear() ; } }
Mapper处理类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 利用二次排序求最高气温的mapper */ public class TempSecondSortMapper extends Mapper<LongWritable,Text,ComboKeyByYearTemp,NullWritable> { /** * * @param key 每行文本的偏移量,这里我们没有用到 * @param value 每行文本的值 1970 32,前面是年份后面是温度 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] arr = line.split(" "); ComboKeyByYearTemp keyOut = new ComboKeyByYearTemp(); keyOut.setYear(Integer.parseInt(arr[0])); keyOut.setTemp(Integer.parseInt(arr[1])); context.write(keyOut,NullWritable.get()); } }
Reduce处理类
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * reducer */ public class TempSecondSortReducer extends Reducer<ComboKeyByYearTemp,NullWritable,IntWritable,Text> { @Override protected void reduce(ComboKeyByYearTemp key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Iterator<NullWritable> ite = values.iterator(); List<String> resList = new ArrayList<String>(); while(ite.hasNext()){ ite.next(); resList.add(key.getTemp()+""); } String res = StringUtils.join(resList.toArray(),","); context.write(new IntWritable(key.getYear()),new Text(res)); } }
运行类mainApp
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.viewfs.ConfigUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.Test; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.util.Random; /** * 运行类 */ public class TempSecondSortApp { public static void main(String[] args) throws Exception { //配置类 Configuration conf = new Configuration(); conf.set("fs.defaultFS","file:///");//本地文件系统 //获取job Job job = Job.getInstance(conf); //设置job的基本属性 job.setJarByClass(TempSecondSortApp.class); job.setMapperClass(TempSecondSortMapper.class); job.setReducerClass(TempSecondSortReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(ComboKeyByYearTemp.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //job设置输入输出路径 FileInputFormat.addInputPath(job,new Path("/home/hadoop/second/temp2.txt")); FileOutputFormat.setOutputPath(job,new Path("/home/hadoop/second/out4")); //job设置分区类 job.setPartitionerClass(PartionByYear.class); //job设置分组类 job.setGroupingComparatorClass(GroupByYear.class); //job设置排序类 job.setSortComparatorClass(SortComparatorByComboKey.class); job.setNumReduceTasks(3); //reduce个数 System.exit(job.waitForCompletion(true)? 0 : 1); }