流量汇总全局倒排序
部分待排数据字段如下:
13480253104 120 1320
13502468823 735 11349
13510439658 1116 954
13560436326 1136 94
13560436666 1136 94
13560439658 918 4938
13602846565 198 910
13660577991 660 690
上代码
FlowBean
/**
* 封装类 数据的传输
* @author chengguo
* @version 1.0
*/
public class FlowBean implements WritableComparable<FlowBean>{
//定义属性
private long upFlow;
private long dfFlow;
private long flowSum;
public FlowBean() {}
//流量累加
public FlowBean(long upFlow, long dfFlow) {
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowSum = upFlow +dfFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDfFlow() {
return dfFlow;
}
public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
}
public long getFlowSum() {
return flowSum;
}
public void setFlowSum(long flowSum) {
this.flowSum = flowSum;
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowSum = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowSum;
}
//排序 -1 上排 1 下排
@Override
public int compareTo(FlowBean o) {
//倒序
return this.flowSum > o.getFlowSum() ? -1:1;
}
}
FlowSortMapper
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1.获取一行数据
String line = value.toString();
//2.切割
String[] fields = line.split("\t");
//3.取出关键字段
Long upFlow = Long.parseLong(fields[1]);
Long dfFlow = Long.parseLong(fields[2]);
//4.写出到reducer阶段
context.write(new FlowBean(upFlow, dfFlow),new Text(fields[0]));
}
}
FlowSortReducer
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
context.write(value.iterator().next(), key);
}
}
FlowSortDriver
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.获取jar包
job.setJarByClass(FlowSortDriver.class);
//3.获取自定义的mapper与reducer类
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
//4.设置mapper的输出数据类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5.设置reducer输出的数据类型(最终数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/BIGDATA/Test/flow1020/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/BIGDATA/Test/flow1020/out"));
//7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}