一:条件准备
准备sort.txt文本
a 1
a 9
b 3
a 7
b 8
b 10
a 5
a 9
排序后输出的文本:
a 1
a 5
a 7
a 9
a 9
b 3
b 8
b 10
二:排序接口WritableComparable
思路:将文本内容转为一个sortBean,将此bean作为k2,使用NullWritable作为v2
sortBean实现WritableComparable接口,重写compareTo方法,指定排序
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Writable:序列化接口
* WritableComparable:即序列化也排序
*/
public class SortBean implements WritableComparable<SortBean>{
// 组合key,第一部分是我们第一列,第二部分是我们第二列
private String first;
private int second;
/**
* 根据第一个字母排序,
* 如果字母相同在根据第二个参数second进行排序
* @param o
* @return:如果返回大于0的数表示由上往下正序排列,小于0表示倒序排列,等于0表示相同
*/
@Override
public int compareTo(SortBean o) {
int i = this.first.compareTo(o.first);
if(i == 0){
int j = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
return j;
}
return i;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public String toString() {
return first+"\t"+second;
}
}
三:map阶段,同时实现计数器
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 排序
* 自定义Map
*/
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable>{
/**
* 排序
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//自定义计数器,通过context上下文获取
//计数我们map输入了多少条数据,这里自定义计数器,key是MR_INPUT_COUNT,value是MAP_TOTAL_RESULT
//输出的结果可以在控制台查看,MAP_TOTAL_RESULT=xxx
Counter mr_input_count = context.getCounter("MR_INPUT_COUNT", "MAP_TOTAL_RESULT");
mr_input_count.increment(1L);//加1
String[] split = value.toString().split("\t");
SortBean sortBean = new SortBean();
sortBean.setFirst(split[0]);
sortBean.setSecond(Integer.valueOf(split[1]));
context.write(sortBean,NullWritable.get());
}
}
四:reduct阶段,枚举方式实现计数器
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 自定义的Reduce,同时也可以作为一个规约类,
* 因为它们都是实现了Reducer类,规约类可以将
*/
public class SortReduce extends Reducer<SortBean,NullWritable,SortBean,NullWritable>{
public static enum counter{
REDUCE_INPUT_RECORD,
REDUCE_OUTPUT_RECORD
}
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//自定义枚举类计数器,最后控制台输出REDUCE_INPUT_RECORD xxx,REDUCE_OUTPUT_RECORD xxx
Counter counter = context.getCounter(SortReduce.counter.REDUCE_INPUT_RECORD);
counter.increment(1);//加1
for (NullWritable value : values) {
//定义输出计数器
Counter counter1 = context.getCounter(SortReduce.counter.REDUCE_OUTPUT_RECORD);
counter1.increment(1);//输出加1
context.write(key,NullWritable.get());
}
}
}
最终结果: