正序排序:
mapreduce本身自带排序,在map阶段是局部有序,在reduce阶段是全局有序的。
现在我们测试单个文件使用map输出
package com.hnxy.mr.Sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author 耀君 正序排序 1 北京 2 天津 8 黑龙江 3 河北 5 内蒙古 4 山西 7 吉林 6 辽宁
*/
public class SortMr extends Configured implements Tool {
public static class SortMrMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
LongWritable outkey = new LongWritable();
Text outval = new Text();
String[] str = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
str = value.toString().split("\t");
if (str.length == 2 && null != str) {
outkey.set(Long.parseLong(str[0]));
outval.set(str[1]);
context.write(outkey, outval);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(SortMr.class);
job.setMapperClass(SortMrMapper.class);
// job.setReducerClass(SortMrReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 创建操作HDFS
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("execution succeed");
} else {
System.out.println("Execution failed");
}
return 0;
// 执行
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SortMr(), args));
}
}
运行结果:
这是一个文件要是两个文件呢?
运行结果:
倒序排序:
默认是正序排序,那么我想要倒叙就需要自定义reduec。
代码:
package com.hnxy.mr.Sort;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author 耀君 排序 1 北京 2 天津 8 黑龙江 3 河北 ...... 按照key进行排序
*/
public class DescSortMR extends Configured implements Tool {
public static class SortMRMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
// 用到的变量
String[] str = null;
LongWritable outkey = new LongWritable();
Text outval = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
// 获得文件
str = value.toString().split("\t");
if (str.length == 2 && null != str) {
outkey.set(Long.parseLong(str[0]));
outval.set(str[1]);
context.write(outkey, outval);
}
// System.out.println(outkey+" "+outval);
}
}
public static class SortMRMReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
// 需要使用的变量
private LongWritable outkey = new LongWritable();
private Text outval = new Text();
private List<Long> keylist = new ArrayList<Long>();
private List<String> vallist = new ArrayList<String>();
@Override
protected void reduce(LongWritable key, Iterable<Text> value,
Reducer<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
keylist.add(key.get());
vallist.add(value.iterator().next().toString());
}
@Override
protected void cleanup(Reducer<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
for (int i = keylist.size() - 1; i >= 0; i--) {
outkey.set(keylist.get(i));
outval.set(vallist.get(i));
context.write(outkey, outval);
System.out.println(outkey + "--" + outval);
}
}
}
@Override
public int run(String[] args) throws Exception {
// 创建Configretion对象
Configuration conf = this.getConf();
// 创建job实例对象
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(DescSortMR.class);
// 设置mapreduce的运行类
job.setMapperClass(SortMRMapper.class);
job.setReducerClass(SortMRMReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
// 创建操作HDFS对象
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
FileOutputFormat.setOutputPath(job, out);
// 执行
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("成功");
} else {
System.out.println("失败");
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new DescSortMR(), args));
}
}
运行结果:
倒叙实现很简单啦,把map端的数据放到集合里面,然后倒着(i--),一个个拿机就行啦。
自定义外部比较:
实现倒序在reduec端自定义可以实现,我们还可以使用外部比较器。我们自定义一个类,继承WritableComparator.进行自定义比较美容。也可以实现!
代码:
自定义比较器
package com.hnxy.mr.Sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
*
* @author 耀君
* 自定义比较规则
*
*/
public class DescSortComparator extends WritableComparator{
public DescSortComparator() {
super(LongWritable.class,true);
}
public int compare(WritableComparable a,WritableComparable b){
LongWritable wa = (LongWritable)a;
LongWritable wb = (LongWritable)b;
return -wa.compareTo(wb);
}
}
实现类代码:
package com.hnxy.mr.Sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author 耀君 正序排序 1 北京 2 天津 8 黑龙江 3 河北 5 内蒙古 4 山西 7 吉林 6 辽宁
*/
public class SortMr2 extends Configured implements Tool {
public static class SortMrMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
LongWritable outkey = new LongWritable();
Text outval = new Text();
String[] str = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
str = value.toString().split("\t");
if (str.length == 2 && null != str) {
outkey.set(Long.parseLong(str[0]));
outval.set(str[1]);
context.write(outkey, outval);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(SortMr2.class);
job.setMapperClass(SortMrMapper.class);
// job.setReducerClass(SortMrReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// 设置排序规则类
job.setSortComparatorClass(DescSortComparator.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 创建操作HDFS
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("execution succeed");
} else {
System.out.println("Execution failed");
}
return 0;
// 执行
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SortMr2(), args));
}
}
运行结果:
还有一种比较方法就是自定义内部比较器。实现WritableComparable接口。
自定义序列化内部比较器:
package com.hnxy.mr.Sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeySortWritable implements WritableComparable<KeySortWritable> {
// 定义要排序的key
private Long outkey = 0L;
public Long getOutkey() {
return outkey;
}
public void setOutkey(Long outkey) {
this.outkey = outkey;
}
public KeySortWritable() {
// TODO Auto-generated constructor stub
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(outkey);
}
@Override
public void readFields(DataInput in) throws IOException {
this.outkey = in.readLong();
}
@Override
public int compareTo(KeySortWritable o) {
// TODO Auto-generated method stub
return -ascSort(o);
}
@Override
public String toString() {
return this.getOutkey().toString();
}
private int ascSort(KeySortWritable o) {
// 比较规则
if (this.getOutkey() < o.getOutkey()) {
return -1;
} else if (this.getOutkey() == o.getOutkey()) {
return 0;
} else {
return 1;
}
}
}
实现类:
package com.hnxy.mr.Sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author 耀君 正序排序 1 北京 2 天津 8 黑龙江 3 河北 5 内蒙古 4 山西 7 吉林 6 辽宁
*/
public class SortMr3 extends Configured implements Tool {
public static class SortMrMapper extends Mapper<LongWritable, Text, KeySortWritable, Text> {
KeySortWritable outkey = new KeySortWritable();
Text outval = new Text();
String[] str = null;
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, KeySortWritable, Text>.Context context)
throws IOException, InterruptedException {
str = value.toString().split("\t");
if (str.length == 2 && null != str) {
outkey.setOutkey(Long.parseLong(str[0]));
outval.set(str[1]);
context.write(outkey, outval);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "job1");
job.setJarByClass(SortMr3.class);
job.setMapperClass(SortMrMapper.class);
// job.setReducerClass(SortMrReducer.class);
job.setOutputKeyClass(KeySortWritable.class);
job.setOutputValueClass(Text.class);
// 设置排序规则类
// job.setSortComparatorClass(DescSortComparator.class);
// 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 创建操作HDFS
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
boolean con = job.waitForCompletion(true);
if (con) {
System.out.println("execution succeed");
} else {
System.out.println("Execution failed");
}
return 0;
// 执行
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SortMr3(), args));
}
}