全局排序
需求:根据用户每月使用的流量按照使用的流量多少排序
接口->WritableComparable
排序操作在hadoop中属于默认的行为。默认按照字典顺序排序。
排序的分类:
1)部分排序
2)全排序
3)辅助排序
4)二次排序
封装类,直接完成排序
public class FlowBean implements WritalbeComparable<FlowBean>{
private long upFlow;
private long dfFlow;
private long flowsum;
pulibc FlowBean(){
}
public FlowBean(long upFlow, long dfFlow){
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowsum = upFlow+dfFlow;
}
setter
getter
@Override
public void readFields(DataInput in) throws Exception{
//反序列化
upFlow = in.readLong();
dfFlow = in.readLong();
flowsum = in.readLong();
}
@Override
public void write(DataOutput out) throws Exception{
//序列化
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
@Override
public int compareTo(FlowBean o){
return this.flowsum >o.getFolwSum()?-1:1;
}
@Override
public String toString(){
//倒序
return upFlow+"\t"+dfFlow+"\t"+flowsum;
}
}
public class FlowSortReducer extends Reducer<FlowBean, Text, Text,FlowBean>{
@Override
public void reduce<FlowBean key, Interater<Text> value, Context context>
throws IOException,InterruptException{
context.write(value.iterator().next(),key)
}
}
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptException{
//1、获取一行数据
String line = value.toString();
//2、切割
String[] fields = line.split("\t");
//3、取出关键字段
long upFlow = Long.parseLong(fields[1]);
long dfFlow = Long.parseLong(fields[1]);
//4、写出到reduce 阶段
context.write(new FlowBean(upFlow,dfFlow), new Text(field[0]))
}
}
public class FlowSortDriver{
public static void main(String[] args){
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(FlowSortDriver.class);
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path("/wc/in"));
FileOutputFormat.setOutputPaths(job,new Paht("wc/out"));
System.out.pritnln(job.waitForCompletion(true)?0:1)
}
}
分区加排序
public class PhonenumPartitioner extends Partitioner<FlowBean, Text>{
@Override
public int getPartition(FlowBean key, Text value, int numPartitions){
//1、获取手机号前三位
String phoneNum = value.toString().substring(0,3);
//2、分区
int partitioner = 4;
if("135".equals(phoneNum)){
return 0;
}else if("137".equals(phoneNum)){
return 1;
}else ("138".equals(phoneNum)){
return 2;
}
return partitioner;
}
}
Diriver类{
job.setPartitionerClass(PhonenumPartitioner.class);
job.setNumReduceTask(4); //设置的数量,要大于自定义的时候分区的数量
}