FlowBean:
package com.wange.flowcountsort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean>{ private long upflow; private long dflow; private long sumflow; // 因为反射机制的需要,必须定义一个无参的构造函数 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public void set(long upflow, long dflow){ this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 将我们要传输的数据序列化成字节流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); out.writeLong(sumflow); } // 反序列化的方法 从数据字节流中恢复出各个字段 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } @Override public int compareTo(FlowBean o) { // 倒序 return this.sumflow > o.getSumflow() ? -1 : 1; } }
由于逻辑比较简单,所以把mapper、reducer都写在一个类中:
package com.wange.flowcountsort; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 流量汇总排序 */ public class FlowCountSortSetpTwo { public static class FlowCountSortSetpTwoMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); String phone = fields[0]; long upFlow = Long.parseLong(fields[1]); long dFlow = Long.parseLong(fields[2]); // 流量信息作为key context.write(new FlowBean(upFlow, dFlow), new Text(phone)); } } public static class FlowCountSortSetpTwoReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowCountSortSetpTwo.class); job.setMapperClass(FlowCountSortSetpTwoMapper.class); job.setReducerClass(FlowCountSortSetpTwoReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
上传jar包并运行:
# 运行 hadoop jar hadoop-mapreduce-partition-1.0.jar com.wange.flowcountsort.FlowCountSortSetpTwo /flow/partition /flow/sortoutput2 # 查看结果 hadoop fs -ls /flow/sortoutput2 hadoop fs -cat /flow/sortoutput2/part-r-00000
结果如果能正常排序就成功了:
13502468823 7335 110349 117684 13925057413 11058 48243 59301 13726230503 2481 24681 27162 13726238888 2481 24681 27162 18320173382 9531 2412 11943 13560439658 2034 5892 7926 13660577991 6960 690 7650 15013685858 3659 3538 7197 13922314466 3008 3720 6728