定义FlowMapper
package cn.learn.mapreduce_sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
FlowBean flowBean = new FlowBean();
String[] split = value.toString().split("\t");
//获取手机号,作为V2
String phoneNum = split[0];
//获取其他流量字段,封装flowBean,作为K2
flowBean.setUpFlow(Integer.parseInt(split[1]));
flowBean.setDownFlow(Integer.parseInt(split[2]));
flowBean.setUpCountFlow(Integer.parseInt(split[3]));
flowBean.setDownCountFlow(Integer.parseInt(split[4]));
//将K2和V2写入上下文中
context.write(flowBean, new Text(phoneNum));
}
}
定义FlowReducer
package cn.learn.mapreduce_sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}