待汇总数据部分如下:
3631279840312 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 660 690 200
3631279730382 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 369 338 200
3631279860392 15889002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 938 380 200
3631279920332 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
3631279860312 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 120 1320 200
3631279840302 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 198 910 200
要求取第二位手机号,倒数第两位上行流量和倒数第一位下行流量汇总
代码如下
FlowBean
/**
* 封装类 直接完成排序
* @author chengguo
* @version 1.0
*/
public class FlowBean implements Writable{
//定义属性
private long upFlow;
private long dfFlow;
private long flowSum;
public FlowBean() {}
public FlowBean(long upFlow, long dfFlow) {
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowSum = upFlow + dfFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDfFlow() {
return dfFlow;
}
public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
}
public long getFlowSum() {
return flowSum;
}
public void setFlowSum(long flowSum) {
this.flowSum = flowSum;
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowSum = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow+ "\t" + flowSum;
}
}
FlowCountMapper
/**
* 原始数据3631279950322 13822544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 www.taobao.com 淘宝网 4 0 264 0 200
* 对文件中的数据 截取手机号 上行流量 下行流量三个字段进行处理
* 13822544101 264 0
* @author chengguo
* @version 1.0
*
*/
public class FlowCountMapper extends Mapper<LongWritable ,Text , Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1.获取数据
String line = value.toString();
//2.切割
String[] fields = line.split("\t");
//3.封装对象 拿到关键字段 数据清洗
String phoneN = fields[1];
//转换类型
long upFlow = Long.parseLong(fields[fields.length-3]);
long dfFlow = Long.parseLong(fields[fields.length-2]);
//4.输出到reducer端13822544101 264 0
context.write(new Text(phoneN) , new FlowBean(upFlow,dfFlow));
}
}
FlowCountReducer
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//1.相同手机号的流量使用再次汇总
long upFlow_sum=0;
long dfFlow_sum=0;
//2.累加
for(FlowBean f: values) {
upFlow_sum += f.getUpFlow();
dfFlow_sum += f.getDfFlow();
}
FlowBean rs = new FlowBean(upFlow_sum,dfFlow_sum);
//3.输出
context.write(key, rs);
}
}
FlowCountDriver
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.获取jar包
job.setJarByClass(FlowCountDriver.class);
//3.获取自定义的mapper与reducer类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//4.设置mapper的输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5.设置reducer输出的数据类型(最终数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
//7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}