问题描述
现在有个sequenceFile文件里面记录着年份和温度,key是年份value是温度,找出每年的最高气温然后按照年份递增排序。因为reducer默认会对key进行排序,解决办法有两种:第一种使用一个reducer,这样做会造成性能问题,因为所有的key都发往了一台机器。第二种是使用分区函数对年份进行分段,在每一个段是递增排序,几个reducer处理后的文件拼接后在整体上也是有序的。
自定义的Mapper只需要把key-value发往Reducer即可:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * MaxTempMapper */ public class MaxTempMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable>{ protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
自定义的Reducer遍历values,找出最大气温即可:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer */ public class MaxTempReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ /** * reduce */ protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int max = Integer.MIN_VALUE ; for(IntWritable iw : values){ max = max > iw.get() ? max : iw.get() ; } context.write(key,new IntWritable(max)); } public static void main(String[] args) { System.out.println(Integer.MIN_VALUE); } }
启动运行类"MaxTempApp"除了设置job运行的基本参数之外,还要设置分区类函数,和采样器:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; /** * 求每个年份最高气温,然后按照年份递增全排序 * reducer数量为3 */ public class MaxTempApp { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration();//配置对象 conf.set("fs.defaultFS","file:///");//本地文件协议 Job job = Job.getInstance(conf);//job对象 job.setJobName("maxTemperatureByTotalPartition"); job.setInputFormatClass(SequenceFileInputFormat.class); job.setJarByClass(MaxTempApp.class); //job添加输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); //job添加输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTempMapper.class); job.setReducerClass(MaxTempReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // job设置reducer个数 job.setNumReduceTasks(3); //job设置全排序分区类,设置分区结果保存路径 job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("/home/hadoop/seq/par.lst")); //job创建随机采样器对象,0.2样本被采纳的概率,4000是样本数量 InputSampler.Sampler<IntWritable,IntWritable> sampler = new InputSampler.RandomSampler<IntWritable, IntWritable>(0.2,4000); InputSampler.writePartitionFile(job,sampler); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
查看采样器的分区结果,我们的key值从1970-2069 我们设置的reducer个数为3,采样器会生成两个值把key分成三段:
key值在2004之前,2004-2035之间,(>=2035)之后
查看输出结果,生成了三个文件每个文件在相应的key区间里有序,由于key区间也是有序的,全排序完成。