翻了以前代码看了下15年时候写的二次排序 哈哈 怀念
实现hadoop自带WritableComparable 序列化排序接口
package com.lei.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class StrWritable implements WritableComparable<StrWritable>{
private String strName;
private String strValue;
public void setstrName(String strName) {
this.strName = strName;
}
public void setstrValue(String strValue) {
this.strValue = strValue;
}
public String getstrName() {
return strName;
}
public String getstrValue() {
return strValue;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(strName);
out.writeUTF(strValue);
}
@Override
public void readFields(DataInput in) throws IOException {
strName =in.readUTF();
strValue =in.readUTF();
}
@Override
public int compareTo(StrWritable o) {
//这里只对StrName进行比较,在Job中在没有分组函数情况下,默认是这个排序,也可以重新定 义
int i= o.getstrName().compareTo(this.strName);
//if(i!=0)
return i;
//return o.getstrName().compareTo(this.strName);
}
}
Main函数
package com.lei.sort;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobStr {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length!=2){
System.err.println("输入正确的参数");
System.exit(2);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"JobStr Dino");
job.setJarByClass(JobStr.class);
job.setMapperClass(MapStr.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setReducerClass(ReduceStr.class);
job.setMapOutputKeyClass(StrWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setSortComparatorClass(SortComparator.class);
job.setGroupingComparatorClass(StrNameComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//判断输入路径是否存在,存在即删除
FileSystem fs= FileSystem.get(URI.create(args[1]), conf,"root");
Path path = new Path(args[1]);
if(fs.exists(path))
{ try
{
fs.delete(path, true);
System.out.println(path.getName() + "删除成功 ");
}catch(Exception e){
System.err.println(e.getMessage());
}
}
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
map端:
package com.lei.sort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapStr extends Mapper<Object, Text, StrWritable, IntWritable>{
private StrWritable sw=new StrWritable();
private IntWritable intWritable =new IntWritable(0);
@Override
protected void map(Object key, Text value,
Context context)
throws IOException, InterruptedException {
// 这里用的inputformat输入格式为KeyValueTextInputFormat 所以会自动将key设置为第一列,第二列为value
sw.setstrName(key.toString());
sw.setstrValue(value.toString());
int valueint=Integer.parseInt(value.toString());
intWritable.set(valueint);
context.write(sw, intWritable);
}
}
reduce端:
package com.lei.sort;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ReduceStr extends Reducer<StrWritable, IntWritable, Text, Text>{
//static{System.out.println("-----------------------------------");}
@Override
protected void reduce(StrWritable sw, Iterable<IntWritable> iterable,
Context context)
throws IOException, InterruptedException {
Iterator<IntWritable> it =iterable.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()){
sb.append(it.next().get()).append(',');
}
if(sb.length()>0)
sb.deleteCharAt(sb.length()-1);
System.out.println(sw.getstrName() +" ======= ========= "+sb.toString());
context.write(new Text(sw.getstrValue()), new Text(sb.toString()));
}
}
每个分区内又调用job.setSortComparatorClass()设置的key比较函数类排序(如果没有通过job.setSortComparatorClass()设置key比较函数类,则使用key的实现的compareTo方法)。可以看到,这是一个二次排序。
package com.lei.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortComparator extends WritableComparator{
public SortComparator(){
super(StrWritable.class,true);
}
@Override
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
StrWritable s1 = (StrWritable)a;
StrWritable s2 =(StrWritable)b;
int i = s1.getstrName().compareTo(s2.getstrName());
System.out.println("&&&&&&&&&&&&&&&&&&&&&&");
if(i!=0)
return i;
return s1.getstrValue().compareTo(s2.getstrValue());
}
}
secondary sort阶段
然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass()设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。
package com.lei.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/*关于实现的是降序还是升序问题,进行比较看返回值,返回1为升序 -1位降序
* */
public class StrNameComparator extends WritableComparator{
public StrNameComparator(){
super(StrWritable.class,true);
}
@Override
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
StrWritable s1 = (StrWritable)a;
StrWritable s2 =(StrWritable)b;
System.out.println("Map key " + s1.getstrName() +"######### "+s2.getstrName());
return s1.getstrName().compareTo(s2.getstrName());
}
}