二次排序
eg: 对左侧序列进行排序, 需要先对字母排序, 然后在对数字进行排序. 得到右侧的序列.
a 2 a 2
c 4 a 4
b 3 b 1
c 1 => b 3
a 4 c 1
b 1 c 4
第一种方式:reduce阶段. map <key,value> ‐> reduce <key,value存入到list中进行比较排序>
public class SortMapReduce extends Configured implements Tool{
// mapper,输入数据变成键值对,一行转化为一条
public static class SortMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
// map 的输出的 key 与 value
private Text mapOutputKey = new Text();
private IntWritable mapOutputValue = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 每一行数据变成键值对
String lineValue = value.toString();
// 数据切割
String[] strs = lineValue.split(" ");
mapOutputKey.set(strs[0]);
mapOutputValue.set(strs[1]);
context.write(mapOutputKey, mapOutputValue);
}
}
// reducer,map的输出就是我们reduce的输入
public static class SortReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private Text outputKey = new Text();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//第一种方式
//用一个list,将key相同的value的值存储起来 <a,<4,2>>,<b,<3,1>>
List<IntWritable> valueList = new ArrayList<IntWritable>();
//迭代之后将值放进去
for(IntWritable value:values) {
valueList.add(value); //<a,4,2>
}
Collections.sort(valueList);//资源开销过大 // 排序
for(Integer value : valuesList) {
context.write(key, new IntWritable(value));
}
}
}
//driver:任务相关设置
public int run(String[] args) throws Exception{
// 获取相关配置
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(SortMapReduce.class);
// 设置input
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
// 设置output
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
// 设置map
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// shuffle 优化
job.setPartitionerClass(FirstPartitioner.class);
// job.setSortComparatorClass(cls);
// job.setCombinerClass(cls);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
//设置 reduce
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/ /将job提交给Yarn
boolean isSuccess = job.waitForCompletion(true);
return isSuccess?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//保存的输入输出路径
args = new String[]{
"hdfs://lee01.cniao5.com:8020/user/root/mapreduce/sort/input",
"hdfs://lee01.cniao5.com:8020/user/root/mapreduce/sort/output"
};
//将任务跑起来
//int statas = new WordCountMapReduce().run(args);
int statas = ToolRunner.run(conf, new SortMapReduce(), args);
//关闭我们的job
System.exit(statas);
}
}
第二种方法 shuffle阶段 <key,value> ‐> <<key,value>,value> ‐> <key,value>
构造数据类型. PairWritable
public class PairWritable implements WritableComparable<PairWritable> {
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first, second);
}
public void set(String first, int second) {
this.setFirst(first);
this.setSecond(second);
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.getFirst());
if (0 != comp) {
return comp;
}
return Integer.valueOf(getSecond()).compareTo(
Integer.valueOf(o.getSecond()));
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairWritable other = (PairWritable) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second != other.second)
return false;
return true;
}
@Override
public String toString() {
return "PairWritable [first=" + first + ", second=" + second + "]";
}
}
map reduce实现. SortMapReduce
public class SortMapReduce extends Configured implements Tool{
//mapper,输入数据变成键值对,一行转化为一条
public static class SortMapper extends Mapper<LongWritable,Text,PairWritable,IntWritable>{
private PairWritable mapOutputKey = new PairWritable();
private IntWritable mapOutputValue = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//每一行数据变成键值对
String lineValue = value.toString();
String[] strs = lineValue.split(" ");
//验证数据是否合法
if (2 != strs.length) {
return;
}
mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutputValue.set(Integer.valueOf(strs[1]));
context.write(mapOutputKey, mapOutputValue);
}
}
// reducer,map的输出就是我们reduce的输入
public static class SortReducer extends Reducer<PairWritable, IntWritable, Text,
IntWritable>{
private Text outputKey = new Text();
@Override
protected void reduce(PairWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
// set output key
outputKey.set(key.getFirst());
context.write(outputKey, value);
}
}
}
// driver:任务相关设置
public int run(String[] args) throws Exception{
//获取相关配置
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(SortMapReduce.class);
//设置input
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
//设置output
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
//设置map
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//shuffle优化
job.setPartitionerClass(FirstPartitioner.class);
// job.setSortComparatorClass(cls);
// job.setCombinerClass(cls);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// 设置reduce
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 将job提交给Yarn
boolean isSuccess = job.waitForCompletion(true);
return isSuccess?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//保存的输入输出路径
args = new String[]{
"hdfs://lee01.cniao5.com:8020/user/root/mapreduce/sort/input",
"hdfs://lee01.cniao5.com:8020/user/root/mapreduce/sort/output"
};
//将任务跑起来
//int statas = new WordCountMapReduce().run(args);
int statas = ToolRunner.run(conf, new SortMapReduce(), args);
//关闭我们的job
System.exit(statas);
}
}
关于shuffle优化. FirstPartitioner, 以原始的 a,b,c的key作为排序依据. 不是以PairWritable进行排序切分的依据.
public class FirstPartitioner extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value,
int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
构建分组的方式 FirstGroupingComparator. 这也是发生在 shuffle阶段.
public class FirstGroupingComparator implements RawComparator<PairWritable> {
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
// b 表示要比较的两个字节数组
// s 第一个字节数组的进行比较的尾部位置, 字节数组中字节的偏移量.
// l 第一个字节比到组合PairWritable的前一个字节. l‐4 是表示去除最后一个字节所占内存的大小, 因为int
类型占用 4 个字节的大小.
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 ‐ 4, b2, 0, l2 ‐ 4);
}
}