Hadoop 实例11 二次排序讲解
在 hadoop2.X 以后使用是
job.setPartitionerClass(Partitioner p);
// map阶段对输入的数据进行分区操作,每个分区映射到一个reducer。
job.setSortComparatorClass(RawComparator c);
//如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
job.setGroupingComparatorClass(RawComparator c);
//使用job.setGroupingComparatorClass设置的分组函数类。
hadoop的接口
#1.Writable===>自定义Value
实现**Writable[HDFS序列化接口]接口的类,可以作为MapReduce的Values进行数据
#2.WritableComparable===>自定义key===>MapReduce的key要出重–排序
如果自定义的类要作为MapReduce的key,必须实现WritableComparable**
2.1 通过equals方法判断key是否相同
2.2 hashCode 使用默认分区HashPartitioner时,会调用key的hashCode方法
2.3 compareTo 默认排序【排序】时,会调用key的compareTo方法
//判断对象是否是同一个对象,当该对象作为MapReduce输出的key进行比较
数据
1949-10-01 14:21:02 34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
1.1自定义key
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MyKey implements WritableComparable<MyKey>{
private int year;
private int month;
private double hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
}
public void readFields(DataInput arg0) throws IOException {
this.year=arg0.readInt();
this.month=arg0.readInt();
this.hot=arg0.readDouble();
}
public void write(DataOutput arg0) throws IOException {
arg0.writeInt(year);
arg0.writeInt(month);
arg0.writeDouble(hot);
}
//判断对象是否是同一个对象,当该对象作为输出的key
public int compareTo(MyKey o) {
int r1 =Integer.compare(this.year, o.getYear());
if(r1==0){
int r2 =Integer.compare(this.month, o.getMonth());
if(r2==0){
return Double.compare(this.hot, o.getHot());
}else{
return r2;
}
}else{
return r1;
}
}
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
long temp;
temp = Double.doubleToLongBits(hot);
result = prime * result + (int) (temp ^ (temp >>> 32));
result = prime * result + month;
result = prime * result + year;
return result;
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MyKey other = (MyKey) obj;
if (Double.doubleToLongBits(hot) != Double.doubleToLongBits(other.hot))
return false;
if (month != other.month)
return false;
if (year != other.year)
return false;
return true;
}
}
2自定义Partition
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* Mapper每输出一个数据执行一次partition
* @author hadoop
*
*/
public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{
//执行时间越短越好
public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
return (key.getYear()-1949)%numReduceTasks;
}
}
3自定义分组
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 确定一个reduce要处理的数据
* 分组实质上也是比较
* @author hadoop
*
*/
public class MyGroup extends WritableComparator{
public MyGroup(){
super(MyKey.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
}
}
}
4自定义排序
/**
* shuffer 过程排序使用
* @author hadoop
*
*/
public class MySort extends WritableComparator{
public MySort(){
super(MyKey.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==0){
return -Double.compare(k1.getHot(), k2.getHot());
}else{
return r2;
}
}else{
return r1;
}
}
}
5 MapReduce主体代码
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node1");
// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
try {
FileSystem fs =FileSystem.get(config);
Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("weather");
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(MyPartitioner.class);//
job.setSortComparatorClass(MySort.class);//
job.setGroupingComparatorClass(MyGroup.class);//
job.setNumReduceTasks(3);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));
Path outpath =new Path("/usr/output/weather");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
}
//key:每行第一个隔开符左边为key,右边为value
static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
NullWritable v =NullWritable.get();
protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {
try {
Date date =sdf.parse(key.toString());
Calendar c =Calendar.getInstance();
c.setTime(date);
int year =c.get(Calendar.YEAR);
int month =c.get(Calendar.MONTH);
double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
MyKey k =new MyKey();
k.setYear(year);
k.setMonth(month);
k.setHot(hot);
context.write(k, new DoubleWritable(hot));
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
Context arg2)
throws IOException, InterruptedException {
int i=0;
for(DoubleWritable v :arg1){
i++;
String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();
arg2.write(new Text(msg), NullWritable.get());
if(i==3){
break;
}
}
}
}
}