一、概念
1、分区:
Hadoop默认分区是根据key的hashCode对ReduceTask个数取模得到的,用户无法控制哪个key存储到哪个分区。
想要控制哪个key存储到哪个分区,需要自定义类继承Partitioner<KEY, VALUE>,
泛型KEY, VALUE分别对应Mapper里的输出key,value,因为分区是在map()之后,环形缓冲区溢写时完成的。
提示:如果ReduceTask的数量大于自定义类中重写的getPartition()设置的分区数时,会产生空的输出
文件part-r-00000
如果ReduceTask的数量小于自定义类中重写的getPartition()设置的分区数时,有一部分分区
数据无处安放,就会报错
如果ReduceTask的数量等于1,则不会走自定义的分区方法,系统默认分区就是1,最终只会输出
一个分区文件
分区号必须从0开始,逐一增加
2、全排序:
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时
效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
二、项目举例
1、待处理文本 :data.txt
武汉市 20 男 王五
黄石市 90 男 卢洪河
武汉市 60 女 王喜
黄石市 80 男 赵存福
黄石市 50 男 陈新华
孝感市 30 女 新华源
武汉市 80 男 齐五
孝感市 80 男 齐五
孝感市 23 男 罗修运
武汉市 10 女 赵易
武汉市 25 男 武六七
孝感市 25 男 袁灿裕
武汉市 60 男 武庚
黄石市 60 男 张永军
黄石市 60 男 吴理阳
孝感市 33 男 邓喜潮
孝感市 30 男 廖相俭
武汉市 45 女 赵琪
武汉市 55 女 李真
孝感市 30 男 蒋结万
黄石市 50 男 刘志斌
武汉市 45 男 李志
孝感市 5 男 袁帅
武汉市 70 女 赵雅
孝感市 40 男 张军
黄石市 60 男 吴扬
武汉市 55 女 真贾
孝感市 30 男 蒋一万
2、需求:
分别统计出各市感染人员信息,输出到对应文件中(说明:武汉的人员信息统一输出到一个文件,十堰的人员信息
统一输出到一个文件),
输出结果按照感染人员的年龄做倒叙排列。输出结果举例:
地区 姓名 年龄 性别
武汉 张三 70 女
武汉 李四 50 男
武汉 王五 60 女
武汉 赵六 55 男
3、PersonBean.java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PersonBean implements WritableComparable<PersonBean> {
private String area; // 感染地区
private String name; // 感染姓名
private Integer age; // 感染年龄
private String sex; // 感染性别
public PersonBean() {
}
@Override
public String toString() {
return area + "\t" + name + "\t" + age + "\t" + sex;
}
public String getArea() {
return area;
}
public void setArea(String area) {
this.area = area;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
// 排序
@Override
public int compareTo(Person2Bean o) {
// 按感染年龄倒序排序(这里主要是区内排序)
return o.getAge().compareTo(this.age);
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(area);
out.writeUTF(name);
out.writeUTF(sex);
out.writeInt(age);
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
area = in.readUTF();
name = in.readUTF();
sex = in.readUTF();
age = in.readInt();
}
}
4、PersonMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PersonMapper extends Mapper<LongWritable,Text,PersonBean,NullWritable> {
private PersonBean bean = new PersonBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 取文本每行内容,并分割
String[] split = value.toString().split("\t");
// 赋值
bean.setArea(split[0]);
bean.setAge(Integer.parseInt(split[1]));
bean.setSex(split[2]);
bean.setName(split[3]);
context.write(bean,NullWritable.get());
}
}
5、PersonPartition.java
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class PersonPartition extends Partitioner<Person2Bean,NullWritable> {
/*
自定义分区,继承Partitioner
泛型对应Mapper端的输出
*/
@Override
public int getPartition(PersonBean personBean, NullWritable nullWritable, int numPartitions) {
// 根据感染地区做三个分区
switch (personBean.getArea()){
case "武汉市":
return 0;
case "黄石市":
return 1;
default:
return 2;
}
}
}
6、PersonReduce.java
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PersonReduce extends Reducer<PersonBean,NullWritable,PersonBean,NullWritable> {
@Override
protected void reduce(PersonBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 输出所有
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
7、PersonDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PersonDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取配置文件和job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 设置jar的路径
job.setJarByClass(PersonDriver.class);
// 设置mapper类和reducer类
job.setMapperClass(PersonMapper.class);
job.setReducerClass(PersonReduce.class);
// 设置mapper输出的key和value的数据类型
job.setMapOutputKeyClass(PersonBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 设置reduce输出的key和value的数据类型
job.setOutputKeyClass(PersonBean.class);
job.setOutputValueClass(NullWritable.class);
// 设置自定义分区的类
job.setPartitionerClass(PersonPartition.class);
// 设置ReduceTask的数量
job.setNumReduceTasks(3);
// 设置要处理文件的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 设置计算完毕后的数据文件的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 提交计算任务(job)
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
8、最终输出为三个文件,效果展示