//map类,实现map类
public class MyMapper extends Mapper<Object, Text, Student, IntWritable>{
private IntWritable outputValue = new IntWritable();
Student stu = new Student();
@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
stu.setNo(arr[0]);
stu.setScore(Integer.parseInt(arr[1]));
outputValue.set(Integer.parseInt(arr[1]));
context.write(stu, outputValue);
}
}
//自定义分区类,保证同key的记录,能映射到相同的reduce端去处理
public class MyPartioner extends Partitioner<Student, IntWritable>{
@Override
public int getPartition(Student key, IntWritable value,
int numPartitions) {
return (key.getNo().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
//shuffle阶段的sort全局排序完成后,如何对数据记录进行分组
public class shuffleSort extends WritableComparator{
public shuffleSort() {
super(Student.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Student stuA = (Student) a;
Student stuB = (Student) b;
return stuA.getNo().compareTo(stuB.getNo());
//对组合key中的key进行排序
}
}
//reduce类,实现reduce函数
public class MyReducer extends Reducer<Student, IntWritable, Text, IntWritable>{
private Text no = new Text();
@Override
protected void reduce(Student key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for(IntWritable val:values){
no.set(key.getNo());
context.write(no,val);
}
}
}
//启动mr的driver方法
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionsParser = new GenericOptionsParser(conf,args);
String[] remainArgs = optionsParser.getRemainingArgs();
if(remainArgs.length != 2){
System.err.println("Usage: yarn jar jar_path main_class_path -D 参数列表 <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"二次排序标准版");
job.setJarByClass(SecondSort.class);
job.setPartitionerClass(MyPartioner.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyReducer.class);
//设置reduce个数
job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Student.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
实体类
public class Student implements WritableComparable<Student>{
private String no;
private int score;
public Student(){
}
public Student(String no, int score) {
super();
this.no = no;
this.score = score;
}
public String getNo() {
return no;
}
public void setNo(String no) {
this.no = no;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(no);
out.writeInt(score);
//把基本类型写入流中
//writeUTF和writeBytes都可以传String类型的参数,而write不可以
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
no = in.readUTF();
score = in.readInt();
}
@Override
public int compareTo(Student o) {
int val = this.no.compareTo(o.no);
if(val != 0){
return 0;
}
return Integer.valueOf(o.score).compareTo(Integer.valueOf(this.score));
}
}