本文以订单案例为例,演示如何进行分组,以及求取每组的前两条数据。
一:案例需求
有如下订单数据
订单id |
商品id |
成交金额 |
Order_0000001 |
Pdt_01 |
222.8 |
Order_0000001 |
Pdt_05 |
25.8 |
Order_0000002 |
Pdt_03 |
522.8 |
Order_0000002 |
Pdt_04 |
122.4 |
Order_0000002 |
Pdt_05 |
722.4 |
Order_0000003 |
Pdt_01 |
222.8 |
现在需要求出相同订单中成交金额最大的两笔交易
二:分析实现过程
将订单表数据封装成一个OrderBean,实现WritableComparable接口,对同一个订单id的数据进行价格排序。将OrderBean作为k2,价格作为v2,且对数据进行分区,将相同订单id的数据放在同一区,所以这里需要自定义实现分区类Partitioner 。在reduce阶段,会将同一个分区的数据给同一个reduceTask去处理,该reduceTask从map阶段的数据启动线程copy到本地后,通过设置自定义实现分组类也就是继承WritableComparator 的类,实现该类的compare方法对同一组的数据或者说同一个分区的数据进行价格比较,通过OrderBean的compareTo方法进行排序。之后再对相同的订单号的key进行合并,得到最上面的一个订单号作为k2,v2是相同订单号的多个数据的价格集合,因为经过了compareTo方法的排序,所以该集合中的价格是有序的。这样在自定义实现reduce中就能获取到前两个最大的价格值,然后作为k3,v3进行输出。
驱动类中需要指定分组类:
job.setGroupingComparatorClass(MyGroupComparator.class);
设置分区:
job.setPartitionerClass(GroupPartition.class);
设置reduceTask数量:
job.setNumReduceTasks(2);
三:自定义Map
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2]));
context.write(orderBean,doubleWritable);
}
}
四:自定义reduce,且获取每组订单前两组数据
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean,DoubleWritable,OrderBean,DoubleWritable> {
@Override
protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
//context.write(key,NullWritable.get());
int i = 0;
for (DoubleWritable value : values) {
i++;
if(i <= 2){
context.write(key,value);
}else{
break;
}
}
}
}
五: 自定义实现的分区类
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupPartition extends Partitioner<OrderBean,DoubleWritable> {
/**
* 将相同订单id的数据放在同一个分区中,让同一个reduceTask去处理
* @param orderBean
* @param doubleWritable
* @param numPartitions
* @return
*/
@Override
public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int numPartitions) {
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
六:自定义实现的分组类
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator {
public MyGroupComparator() {
super(OrderBean.class,true);
}
/**
* 将相同订单id的数据作为一组进行比较,因为OrderBean重写了compareTo方法,所以会进行价格比较排序
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
return first.getOrderId().compareTo(second.getOrderId());
}
}
七:OrderBean封装类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
/**
* 按照价格进行排序
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//需要先比较我们的订单id,如果订单id相同的,我们再按照金额进行排序
//如果订单id不相同,没有可比性
int result = this.orderId.compareTo(o.orderId);
if(result ==0){
//如果订单id相同,继续比较价格,按照价格进行排序,
//如果订单id不相同没有可比性
result = this.price.compareTo(o.price);
return -result;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId= in.readUTF();
this.price = in.readDouble();
}
set/get/toString/...
}
八:驱动程序类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "group");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(GroupPartition.class);
//设置我们自定义的分组类
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output_topN"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
测试结果: