如果数据的结果文件也非常大,就需要根据某种规则把结果文件也分成多个,默认分区实现是:HashPartitioner,自定义分区需要注意:
- 继承Partitioner类,重写getPartition方法
- 设置reduce task进程数量要与自定义分区数量相等
示例:根据手机号的前3位设置不同的分区:
FlowBean:
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable{ private long upflow; private long dflow; private long sumflow; // 因为反射机制的需要,必须定义一个无参的构造函数 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public void set(long upflow, long dflow){ this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 将我们要传输的数据序列化成字节流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); } // 反序列化的方法 从数据字节流中恢复出各个字段 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } }
ProvinceFlowCountMapper:
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ProvinceFlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ private Text k = new Text(); private FlowBean bean = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); String phone = fields[1]; long upflow = Long.parseLong(fields[fields.length - 3]); long dflow = Long.parseLong(fields[fields.length - 2]); k.set(phone); bean.set(upflow, dflow); context.write(new Text(phone), bean); } }
ProvinceFlowCountReducer:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ProvinceFlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{ private FlowBean bean = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upflowsum = 0; long dflowsum = 0; for (FlowBean bean : values) { upflowsum += bean.getUpflow(); dflowsum += bean.getDflow(); } bean.set(upflowsum, dflowsum); context.write(key, bean); } }
自定义分区类:ProvincePartitioner
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; import java.util.Map; public class ProvincePartitioner extends Partitioner<Text, FlowBean>{ private static Map<String, Integer> provinceMap = new HashMap<>(); // 初始化的时候加载外部字典数据 static { provinceMap.put("136", 0); provinceMap.put("137", 1); provinceMap.put("138", 2); provinceMap.put("139", 3); } @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { String prefix = text.toString().substring(0, 3); Integer provinceNum = provinceMap.get(prefix); if (provinceNum == null) provinceNum = 4; return provinceNum; } }
JobSubmitterClient:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitterClient { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(JobSubmitterClient.class); job.setMapperClass(ProvinceFlowCountMapper.class); job.setReducerClass(ProvinceFlowCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 指定partitional类来自定义分区机制,替换掉系统默认的HashPartitioner job.setPartitionerClass(ProvincePartitioner.class); // reduce task进程数量要与自定义分区数量相等 job.setNumReduceTasks(5); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); System.exit(success ? 0 : 1); } }
上传jar包并运行:hadoop jar hadoop-mapreduce-partition-1.0.jar com.wange.JobSubmitterClient /flow/srcdata /flow/partition
结果:
hadoop fs -ls /flow/partition [root@hadoop-server-00 ~]# hadoop fs -cat /flow/partition/part-r-00000 13602846565 1938 2910 4848 13660577991 6960 690 7650 [root@hadoop-server-00 ~]# hadoop fs -cat /flow/partition/part-r-00001 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 [root@hadoop-server-00 ~]# hadoop fs -cat /flow/partition/part-r-00002 13826544101 264 0 264 [root@hadoop-server-00 ~]# hadoop fs -cat /flow/partition/part-r-00003 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 [root@hadoop-server-00 ~]# hadoop fs -cat /flow/partition/part-r-00004 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 5548 [root@hadoop-server-00 ~]#
设置reduce task的数量如果是1个,则执行结果也是正常的汇总到一个文件中,如果设置的数量大于分区的数量,也会正常执行,如果小于分区的数量,则执行过程会抛出异常。