public class FlowBean implements WritableComparable<FlowBean>{
private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;
//在反序列化时,反射机制需要调用空参构造函数
public FlowBean() {
}
//为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}
@Override
public String toString() {
return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
}
//从数据流中反序列出对象的数据
//从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.phoneNB = in.readUTF();
this.up_flow = in.readLong();
this.d_flow = in.readLong();
this.s_flow = in.readLong();
}
//将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
@Override
public int compareTo(FlowBean o) {
// TODO Auto-generated method stub
return this.s_flow>o.getS_flow()?-1:1;
}
//get/set方法
}
/**********************************************************************************************************************/
/**
* FlowBean 是自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制
* 就必须实现hadoop相应的序列化接口
*
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
//一行数据
String line = value.toString();
//切分成各个字段
//String[] fields = StringUtils.split(line,"\t");
String[] fields = line.split("\t");
//需要的字段
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);
//封装数据为kv并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
};
}
/*********************************************************************************************************/
//框架每传递一组数据<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>调用一次我们的reduce方法
//reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
public class FlowSumReduce extends Reducer<Text, FlowBean, Text, FlowBean>{
protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException ,InterruptedException {
long up_flow_counter = 0;
long d_flow_counter = 0;
for (FlowBean bean : values){
up_flow_counter+=bean.getUp_flow();
d_flow_counter+=bean.getD_flow();
}
context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
};
}
/**************************************************************************************************/
public class FlowSumRunner extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumRunner.class);
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FlowSumRunner(),args);
System.exit(res);
}
}