二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的。
mapreduce原理
在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个阶段。
map阶段
首先通过用户提供的inputFormat将对应的InputSpilt解析成一系列的key/value,并依次交给用户编写的map函数处理,接着按照指定的partitioner对数据进行分片,以确定每个key/value将交给哪个Reduce Task处理;之后将数据交给用户定义的Combiner进行合并(可选),最后将处理结果保存在本地磁盘上。
reduce阶段
首先从各个Map Task上面拷贝对应的数据分片,待所有数据拷贝完之后, 再以key为关键字对所有的数据进行排序 ,通过排序 key相同的记录聚集到一起形成若干分组,然后将每组数据交给用户编写的reduce函数处理,并按照outputFormat定义保存结果。
排序过程
考虑到完全由Reduce Task进行全局排序会产生性能瓶颈,hadoop先对各个Map Task的输出结果做一次局部排序,然后交给Reduce Task做一次全局排序。也就是说Map Task和Reduce Task均会按照key进行排序,该操作属于hadoop的默认行为。 任何应用程序的数据均会被排序,而不管逻辑上是否需要。
map阶段会将处理结果暂时放在一个缓冲区(环形)中,当缓冲区使用率达到一定阀值后,会对缓冲区数据做一次排序(采用改进的快速排序),并写入本地磁盘,当map阶段结束后,会进行合并,将这些小文件合并成一个大的有序文件(采用小根堆合并排序)。
reduce阶段,从每个map task远程拷贝相应的数据文件,如果文件超过一定阀值,则放到磁盘上,否则放到内存里。如果磁盘上面的文件数目达到一封阀值,则进行一次合并生一个一个更大文件;如果内存中文件大小或者数据超过一定阀值,则进行一次合并,并将数据写到磁盘上, 当所有数据拷贝完成后,统一对内存和磁盘的所有数据进行一次排序合并。
二次排序原理
解决方案有两种:
- 在reduce函数里面排序,缺点如果value值比较多,会导致内存溢出。
- 利用mapreduce的特性
具体过程:
1. 原始的键值对是(k,v)。这里的k就是就的key,也可以称为natural key
2. 我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v),这样一来排序全部基于composite key定义的比较器。
3. 自定义分区函数(Partitioner),将natural key相同的键值对发送到同一个Reduce中;
4. 自定义分组函数(WritableComparator),将natural key相同的键值对当作一个分组,reduce里面输入的key仍然为composite key,我们需处理下只取natural key。
关键代码
MyPairComparable
定义组合key(first key + second key),排序规则:先按照first key排序,之后按照seccond key 排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author jhaoniu * @description:排序组合对儿,这里可以扩充到多个,实现多次排序 * @date 15-6-25 下午1:46 */ public class MyPairComparable implements WritableComparable<MyPairComparable> { private Text firstKey; private Text secondKey; public MyPairComparable() { this.firstKey = new Text(); this.secondKey = new Text(); } public void setFirstKey(Text firstKey) { this.firstKey = firstKey; } public void setSecondKey(Text secondKey) { this.secondKey = secondKey; } public Text getFirstKey() { return firstKey; } public Text getSecondKey() { return secondKey; } public int compareTo(MyPairComparable o) { int code = this.firstKey.compareTo(o.getFirstKey()); return code == 0 ? this.getSecondKey().compareTo(o.getSecondKey()) : code; } public void write(DataOutput out) throws IOException { this.firstKey.write(out); this.secondKey.write(out); } public void readFields(DataInput in) throws IOException { this.firstKey.readFields(in); this.secondKey.readFields(in); } } |
Partitioner
Partitioner决定每个key/value交给哪个reduce task处理,这里决定只要first key相同就会被分发到同一个reduce task 上。
MyPartitioner代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import org.apache.hadoop.mapreduce.Partitioner; /** * @author jhaoniu * @description: * @date 15-6-25 下午1:50 */ public class MyPartitioner<V> extends Partitioner<MyPairComparable, V> { /** * 按照第一个key分发 * * @param myPair * @param v * @param numPartitions * @return */ public int getPartition(MyPairComparable myPair, V v, int numPartitions) { return (myPair.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions; } } |
MyGroupingComparator
决定如何分组,FirstKey相同就认为是同一个分组,这样first key相同的记录会归到同一个reduce处理,并且value 已经是按照 first + second 排好序的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author jhaoniu * @description: 分组 * @date 15-6-25 下午2:00 */ public class MyGroupingComparator extends WritableComparator { private static final Log LOG = LogFactory.getLog(MyGroupingComparator.class); protected MyGroupingComparator() { super(MyPairComparable.class, true); } public int compare(WritableComparable a, WritableComparable b) { return ((MyPairComparable) a).getFirstKey().compareTo(((MyPairComparable) b).getFirstKey()); } } |
入口函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
public static class LocationSecondarySortMapper extends Mapper<WritableComparable, HCatRecord, MyPairComparable, Text> { private HCatSchema schema; private MyPairComparable myPairComparable; protected void setup(final Context context) throws IOException, InterruptedException { schema = HCatInputFormat.getTableSchema(context.getConfiguration()); myPairComparable = new MyPairComparable(); } public void map(WritableComparable key, final HCatRecord value, final Context context) throws IOException, InterruptedException { //获取hive表每行数据 String lng = value.getString(SOURCE_TABLE_SCHEMA.LNG, schema); String lat = value.getString(SOURCE_TABLE_SCHEMA.LAT, schema); String uuid = value.getString(SOURCE_TABLE_SCHEMA.UUID, schema); String logTime = value.getString(SOURCE_TABLE_SCHEMA.LOG_TIME, schema); String location = lat + "," + lng + "," + logTime; //减少数据,方便测试 if (null != uuid && uuid.length() == 64 && uuid.charAt(5) == '2') { myPairComparable.setFirstKey(new Text(uuid)); myPairComparable.setSecondKey(new Text(logTime)); context.write(myPairComparable, new Text(location)); } } } public static class LocationSecondarySortReducer extends Reducer<MyPairComparable, Text, Text, Text> { public void reduce(MyPairComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text newKey = new Text(key.getFirstKey()); Iterator<Text> it = values.iterator(); while (it.hasNext()) { Text value = it.next(); context.write(newKey, new Text(value.toString())); } } } secondarySortJob.setMapperClass(LocationSecondarySortMapper.class); //设置分发到reduce的规则 secondarySortJob.setPartitionerClass(MyPartitioner.class); //设置分组规则 secondarySortJob.setGroupingComparatorClass(MyGroupingComparator.class); |