需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)。
1、输入数据
1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200
2,13846544121,192.196.100.2,,264,0,200
3,13956435636,192.196.100.3,,132,1512,200
4,13966251146,192.168.100.1,,240,0,404
5,18271575951,192.168.100.2,www.atguigu.com,1527,2106,200
6,84188413,192.168.100.3,www.atguigu.com,4116,1432,200
7,13590439668,192.168.100.4,,1116,954,200
8,15910133277,192.168.100.5,www.hao123.com,3156,2936,200
9,13729199489,192.168.100.6,,240,0,200
10,13630577991,192.168.100.7,www.shouhu.com,6960,690,200
11,15043685818,192.168.100.8,www.baidu.com,3659,3538,200
12,15959002129,192.168.100.9,www.atguigu.com,1938,180,500
13,13560439638,192.168.100.10,,918,4938,200
14,13470253144,192.168.100.11,,180,180,200
15,13682846555,192.168.100.12,www.qq.com,1938,2910,200
16,13992314666,192.168.100.13,www.gaga.com,3008,3720,200
17,13509468723,192.168.100.14,www.qinghua.com,7335,110349,404
18,18390173782,192.168.100.15,www.sogou.com,9531,2412,200
19,13975057813,192.168.100.16,www.baidu.com,11058,48243,200
20,13768778790,192.168.100.17,,120,120,200
21,13568436656,192.168.100.18,www.alibaba.com,2481,24681,200
22,13568436656,192.168.100.19,,1116,954,200
2、期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到另一个文件中。
思路:手机号作为key,行值作为value
如果想实现本案例请先实现https://blog.csdn.net/qq_17623363/article/details/104123776的案例
编写程序
1、把FlowDriver复制一份
内容如下:
package com.zhenghui.partition;
import com.zhenghui.flow.FlowBean;
import com.zhenghui.flow.FlowMapper;
import com.zhenghui.flow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyPartitionDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取一个Job实例
Job job = Job.getInstance(new Configuration());
//2、设置我们的类路径CLasspath
job.setJarByClass(MyPartitionDriver.class);
//3、设置Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//设置5个ReduceTasks
job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartition.class);
//4、设置Mapper和Reducer的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//5、设置输入输出数据
FileInputFormat.setInputPaths(job, new Path("E:\\file\\phone.txt"));
FileOutputFormat.setOutputPath(job,new Path("E:\\output"));
//6、提交我们的Job
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
== 只增加了:
//设置5个ReduceTasks
job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartition.class);==
MyPartition.java文件
package com.zhenghui.partition;
import com.zhenghui.flow.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//分区
public class MyPartition extends Partitioner<Text, FlowBean> {
/**
* 返回分区号
* @param text
* @param flowBean
* @param numPartitions
* @return
*/
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phone = text.toString();//获取手机号
switch (phone.substring(0,3)){
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
测试结果:
可以看出成功的把文件上输出了5个文件,则完成了咱们的分区的目的,那么看一下里面的数据是否按要求来了。
可以看出数据完全符合要求,本实验完成。