在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一个省份的数据放到一个文件中,按照性别划分的话,需要把同一个性别的数据放到一个文件中.我们知道最终的输出数据是来自Reducer任务的,那么如果要得到多个文件,意味着有同样数的Reducer任务在运行.
Reducer任务的数据来自于Mapper任务,也就是说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行.
Mapper任务划分数据的过程就称作Partition.负责实现划分数据的类称作Partitioner
package com.thp.bigdata.provinceflow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
// 反序列化时,需要反射调用空参构造函数,所以要显式定义一个
public FlowBean() {}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法:
* 注意 : 反序列化的顺序跟序列化的顺序完全一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
// 输出打印的时候调用的是toString() 方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
package com.thp.bigdata.provinceflow;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
public static HashMap<String, Integer> provinceDict = new HashMap<String, Integer>();
static {
provinceDict.put("136", 0);
provinceDict.put("137", 1);
provinceDict.put("138", 2);
provinceDict.put("139", 3);
}
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
String prefix = key.toString().substring(0, 3);
Integer pronviceId = provinceDict.get(prefix);
return pronviceId == null ? 4 : pronviceId;
}
}
package com.thp.bigdata.provinceflow;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 将一行内容转换成string
String line = value.toString();
// 切分字段
String[] fields = line.split("\t");
// 取出手机号
String phoneNumber = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 遍历所有的bean,将其中的上行流量,下行流量分别累加
for(FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_downFlow += bean.getDownFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCount.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 指定我们自定义的数据分区器
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应"分区"数量的reducetask
job.setNumReduceTasks(5);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
代码地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/provinceflow