自定义比较器 GroupingComparator
其实就是自己定义MapReduce的比较规则 : 在Map任务完成后 进入shuffle阶段 会根据MR默认的比较规则 将Map任务输出的key-value
值,根据他们key值的不同 将相同的key分为一组。 这也是为什么Reduce任务中 reduce方法的参数是一个key
和一个迭代器
的value 。 就是根据默认的比较器将key值相等的key-value分为一组。
而有时候我们想要自己定义一个比较器而不是用默认的根据key值是否相等比较 ,我们可以自己定义一个比较器
案例如下:
有如下的订单数据:
1 Pdt_01 222.8
1 Pdt_02 52.8
2 Pdt_03 522.8
2 Pdt_04 122.4
2 Pdt_05 722.4
3 Pdt_06 232.8
3 Pdt_02 33.8
现在要求输出每一个订单中的最贵的商品
代码如下:
package com.jee.groupingcomparator;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//订单类
public class Order implements WritableComparable<Order> {
private String orderId;
private String orderName;
private double price;
public Order(String orderId, String orderName, double price) {
this.orderId = orderId;
this.orderName = orderName;
this.price = price;
}
public Order() {
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderName() {
return orderName;
}
public void setOrderName(String orderName) {
this.orderName = orderName;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", orderName='" + orderName + '\'' +
", price=" + price +
'}';
}
//二次比较
@Override
public int compareTo(Order o) {
int r = this.orderId.compareTo(o.getOrderId());
if(r == 0){
return (int) (o.getPrice() - this.price);
}else{
return r;
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeUTF(orderName);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
orderId = dataInput.readUTF();
orderName = dataInput.readUTF();
price = dataInput.readDouble();
}
}
package com.jee.groupingcomparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//Mapper类 输出的key是Order类 value是空值,用NullWritable代替
public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {
private Order order = new Order();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
order.setOrderId(items[0]);
order.setOrderName(items[1]);
order.setPrice(Double.parseDouble(items[2]));
context.write(order,NullWritable.get());
}
}
package com.jee.groupingcomparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//默认的MapReduce比较器是通过比较key的值是否相等决定的 我们也可以自己定义一个比较器 自己定义规则
public class OrderComparator extends WritableComparator {
//由于比较器中比较的是Order这个类 我们需要在比较器的构造方法中 引用父类的构造方法 将Order的类注入进去 否则会报空指针错误
protected OrderComparator(){
super(Order.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Order bean1 = (Order) a;
Order bean2 = (Order) b;
//默认的比较器 需要两个Order类值相等才可以 我们可以就比较他们的订单号 如果订单号相同就让他们的值相等
return bean1.getOrderId().compareTo(bean2.getOrderId());
}
}
package com.jee.groupingcomparator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//Reducer类
public class OrderReducer extends Reducer<Order, NullWritable,Order,NullWritable> {
@Override
protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
package com.jee.groupingcomparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(Order.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Order.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("d:/Hadoop/input"));
FileOutputFormat.setOutputPath(job,new Path("d:/Hadoop/output"));
//设置自定义比较器
job.setGroupingComparatorClass(OrderComparator.class);
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}