版权声明:1911907658 https://blog.csdn.net/qq_33598343/article/details/83542133
将之前流量案例按号码前三位来分区
1.FlowCountMapper
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1.获取数据
String line = value.toString();
//2.切割
String[] fields = line.split("\t");
//3.封装对象 拿到关键字段 数据清洗
String phoneN = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long dfFlow = Long.parseLong(fields[fields.length - 2]);
//4.输出到reducer端13726130503 299 681 980
context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
}
}
2.FlowCountReducer
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//1.相同手机号 的流量使用再次汇总
long upFlow_sum = 0;
long dfFlow_sum = 0;
//2.累加
for(FlowBean f:values) {
upFlow_sum += f.getUpFlow();
dfFlow_sum += f.getDfFlow();
}
FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);
//3.输出
context.write(key, rs);
}
}
3. FlowBean
public class FlowBean implements Writable{
//定义属性
private long upFlow;
private long dfFlow;
private long flowSum;
public FlowBean() {}
//流量累加
public FlowBean(long upFlow, long dfFlow) {
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowSum = upFlow + dfFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDfFlow() {
return dfFlow;
}
public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
}
public long getFlowsum() {
return flowSum;
}
public void setFlowsum(long flowsum) {
this.flowSum = flowsum;
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowSum = in.readLong();
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowSum;
}
}
4.PhonenumPartitioner
public class PhonenumPartitioner extends Partitioner<Text, FlowBean>{
//根据手机号前三位进行分区
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
//1.获取手机号前三位
String phoneNum = key.toString().substring(0, 3);
//2.分区
int partitioner = 4;
if("135".equals(phoneNum)) {
return 0;
}else if("137".equals(phoneNum)) {
return 1;
}else if("138".equals(phoneNum)) {
return 2;
}else if("139".equals(phoneNum)) {
return 3;
}
return partitioner;
}
}
5.FlowCountDriver
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.获取jar包
job.setJarByClass(FlowCountDriver.class);
// 3.获取自定义的mapper与reducer类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4.设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5.设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置自定义的分区类
job.setPartitionerClass(PhonenumPartitioner.class);
job.setNumReduceTasks(5);
// 6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));
// 7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}