题外话:
《Hadoop in Action》 是一本非常不错的交Hadoop的入门书,而且建议看英文版。此书作者的英文表达非常简单易懂。相信有一定英文阅读能力的同学直接用英文版就能非常容易的上手~
进入正题。 这个题目是《Hadoop in Action》 上面的一道题目,求出Top K的值。
我自己随便弄了一个输入文件:
g 445 a 1117 b 222 c 333 d 444 e 123 f 345 h 456
讲讲我的思路:
对于Top K的问题,首先要在每个block/分片之中找到这部分的Top K。并且由于只能输出一次,所以输出的工作需要在cleanup方法之中进行。为了简单,使用的是java之中的TreeMap,因为这个数据结构天生就带有排序功能。 而Reducer的工作流程跟Map其实是完全一致的,只是光Map一步还不够,所以只能再加一个Reduce步骤。
最终输出的格式为如下:(K=2)
1117 a 456 g
所以需要使用map。 如果只需要输出大小的话,直接使用TreeSet会更高效一点。
下面是实现的代码:
package hadoop_in_action_exersice; import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TopK { public static final int K = 2; public static class KMap extends Mapper<LongWritable, Text, IntWritable, Text> { TreeMap<Integer, String> map = new TreeMap<Integer, String>(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(line.trim().length() > 0 && line.indexOf("\t") != -1) { String[] arr = line.split("\t", 2); String name = arr[0]; Integer num = Integer.parseInt(arr[1]); map.put(num, name); if(map.size() > K) { map.remove(map.firstKey()); } } } @Override protected void cleanup( Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num : map.keySet()) { context.write(new IntWritable(num), new Text(map.get(num))); } } } public static class KReduce extends Reducer<IntWritable, Text, IntWritable, Text> { TreeMap<Integer, String> map = new TreeMap<Integer, String>(); public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { map.put(key.get(), values.iterator().next().toString()); if(map.size() > K) { map.remove(map.firstKey()); } } @Override protected void cleanup( Reducer<IntWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num : map.keySet()) { context.write(new IntWritable(num), new Text(map.get(num))); } } } public static void main(String[] args) { // TODO Auto-generated method stub Configuration conf = new Configuration(); try { Job job = new Job(conf, "my own word count"); job.setJarByClass(TopK.class); job.setMapperClass(KMap.class); job.setCombinerClass(KReduce.class); job.setReducerClass(KReduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("/home/hadoop/DataSet/Hadoop/WordCount-Result")); FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/DataSet/Hadoop/TopK-output1")); System.out.println(job.waitForCompletion(true)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }