Partition了解
Partition位置
Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:
1)均衡负载,尽量的将工作均匀的分配给不同的reduce。
2)效率,分配速度一定要快。
patition类结构
1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
2. HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
需求:
需求1:根据高考数据计算 一本 二本 专科 未录取的数据,并分别输出
数据:
东城区 东城区第1中学 盖谦 2001-01-14 11480630 540
东城区 东城区第1中学 笪艳 2001-03-06 11720510 480
东城区 东城区第1中学 暨寒 2002-06-26 11391225 443
东城区 东城区第1中学 车世琛 2002-04-22 11883106 458
东城区 东城区第1中学 韶梦英 1998-08-04 11554719 130
东城区 东城区第1中学 加思 2000-12-28 11788401 454
东城区 东城区第1中学 墨琳娅 2001-07-21 11340297 424
东城区 东城区第1中学 雍山伟 1999-02-06 11928178 397
代码:
package com.hnxy.mr.Sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hnxy.mr.Sort.SortMr3.SortMrMapper;
/**
*
* @author 耀君
*
*
*/
public class PartitionDataMR extends Configured implements Tool {
// 东城区 东城区第1中学 盖谦 2001-01-14 11480630 540
private static class PartitionDataMRMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text outkey = new Text();
LongWritable outval = new LongWritable();
String[] str = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// 拿一行数据
str = value.toString().split("\t");
// 防御式编程
if (str.length == 6 && null != str) {
outkey.set(str[2]);
outval.set(Long.parseLong(str[5]));
context.write(outkey, outval);
} else {
// 记录无效数据
context.getCounter("line_info", "bad_line").increment(1);
}
}
}
// partition
private static class MRPartition extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
// 分区判断
int pid = 0;
if (value.get() >= 550) {
pid = 0;
} else if (value.get() >= 450 && value.get() < 550) {
pid = 1;
} else if (value.get() >= 250 && value.get() < 450) {
pid = 2;
} else {
pid = 3;
}
return pid;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(PartitionDataMR.class);
job.setMapperClass(PartitionDataMRMapper.class);
// job.setReducerClass(SortMrReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setPartitionerClass(MRPartition.class);
// 设置reduce的个数
job.setNumReduceTasks(4);
// job.setSortComparatorClass(DescSortComparator.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 创建操作HDFS
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("execution succeed");
} else {
System.out.println("Execution failed");
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new PartitionDataMR(), args));
}
}
2)需求2:根据高考数据计算 一本 二本 专科 未录取的数据,并分别排序(分数降序)输出
1.自定义序列化比较类
package com.hnxy.mr.Sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class StudentWritable implements WritableComparable<StudentWritable> {
// 定义 学上姓名 和成绩的属性
private String sname;
private Integer score = 0;
public String getSname() {
return sname;
}
public void setSname(String sname) {
this.sname = sname;
}
public Integer getScore() {
return score;
}
public void setScore(Integer score) {
this.score = score;
}
@Override
public String toString() {
return "StudentWritable [sname=" + sname + ", score=" + score + "]";
}
public StudentWritable() {
// TODO Auto-generated constructor stub
}
@Override
public void write(DataOutput out) throws IOException {
// 序列化
out.writeUTF(sname);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化
this.sname = in.readUTF();
this.score = in.readInt();
}
@Override
public int compareTo(StudentWritable o) {
// 自定义比较
if (this.getScore() < o.getScore()) {
return 1;
} else if (this.getScore() == o.getScore()) {
return 0;
} else {
return -1;
}
}
}
实现类:
package com.hnxy.mr.Sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hnxy.mr.Sort.SortMr3.SortMrMapper;
/**
*
* @author 耀君
*
* 根据高考数据计算 一本 二本 专科 未录取的数据,并分别排序(分数降序)输出
*/
public class DataMR extends Configured implements Tool {
// 东城区 东城区第1中学 盖谦 2001-01-14 11480630 540
private static class PartitionDataMRMapper extends Mapper<LongWritable, Text, StudentWritable, NullWritable> {
StudentWritable outkey = new StudentWritable();
LongWritable outval = new LongWritable();
String[] str = null;
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, StudentWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 拿一行数据
str = value.toString().split("\t");
// 防御式编程
if (str.length == 6 && null != str) {
outkey.setSname(str[2]);
outkey.setScore(Integer.parseInt(str[5]));
context.write(outkey, NullWritable.get());
} else {
// 记录无效数据
context.getCounter("line_info", "bad_line").increment(1);
}
}
}
// partition
private static class MRPartition extends Partitioner<StudentWritable, NullWritable> {
@Override
public int getPartition(StudentWritable key, NullWritable value, int numPartitions) {
// 分区判断
int pid = 0;
if (key.getScore() >= 550) {
pid = 0;
} else if (key.getScore() >= 450 && key.getScore() < 550) {
pid = 1;
} else if (key.getScore() >= 250 && key.getScore() < 450) {
pid = 2;
} else {
pid = 3;
}
return pid;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(DataMR.class);
job.setMapperClass(PartitionDataMRMapper.class);
// job.setReducerClass(SortMrReducer.class);
job.setOutputKeyClass(StudentWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(MRPartition.class);
// 设置reduce的个数
job.setNumReduceTasks(4);
// job.setSortComparatorClass(DescSortComparator.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 创建操作HDFS
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("execution succeed");
} else {
System.out.println("Execution failed");
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new DataMR(), args));
}
}
结果:
一本:
二本:
三本:
落榜: