MapReduce求TopN的三种方法

本文采用三种方式对movie数据进行TopN排序
第一种是直接排序,在ReduceTask中进行排序
第二种是利用Tree排序,该方式利用小顶堆和集合重复原理的方式 , 每过来一个数据 , 跟堆顶数据进行比较 , 如果比最小的大 , 则 =踢掉换新的 , 否则直接跳过数据 . 以此对数据进行排序 . 
第三种是利用MepReduce高级API编程,定义分区器和分组比较器.

数据样本:

{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"}
{"movie":"661","rate":"3","timeStamp":"978302109","uid":"1"}
{"movie":"914","rate":"3","timeStamp":"978301968","uid":"1"}
{"movie":"3408","rate":"4","timeStamp":"978300275","uid":"1"}
{"movie":"2355","rate":"5","timeStamp":"978824291","uid":"1"}
{"movie":"1197","rate":"3","timeStamp":"978302268","uid":"1"}
{"movie":"1287","rate":"5","timeStamp":"978302039","uid":"1"}
{"movie":"2804","rate":"5","timeStamp":"978300719","uid":"1"}
{"movie":"594","rate":"4","timeStamp":"978302268","uid":"1"}
{"movie":"919","rate":"4","timeStamp":"978301368","uid":"1"}
{"movie":"595","rate":"5","timeStamp":"978824268","uid":"1"}
{"movie":"938","rate":"4","timeStamp":"978301752","uid":"1"}
{"movie":"2398","rate":"4","timeStamp":"978302281","uid":"1"}
{"movie":"2918","rate":"4","timeStamp":"978302124","uid":"1"}

分析结果 : 以Top2为例

MovieBean [movie=1, rate=5, timeStamp=959639915, uid=5524]
MovieBean [movie=1, rate=5, timeStamp=966904109, uid=3527]
MovieBean [movie=10, rate=5, timeStamp=971922432, uid=2899]
MovieBean [movie=10, rate=5, timeStamp=975867811, uid=606]
MovieBean [movie=100, rate=5, timeStamp=964115248, uid=4626]
MovieBean [movie=100, rate=5, timeStamp=965387978, uid=4277]
MovieBean [movie=1000, rate=5, timeStamp=976240118, uid=474]
MovieBean [movie=1000, rate=5, timeStamp=966468743, uid=3644]
MovieBean [movie=1002, rate=5, timeStamp=962597546, uid=4981]
MovieBean [movie=1002, rate=5, timeStamp=965314044, uid=4225]
MovieBean [movie=1003, rate=5, timeStamp=974698008, uid=1908]
MovieBean [movie=1003, rate=5, timeStamp=963248386, uid=4745]
MovieBean [movie=1004, rate=5, timeStamp=965581802, uid=3997]
MovieBean [movie=1004, rate=5, timeStamp=957671549, uid=5862]

方式一 : 

直接利用MapReduce框架做 , 在ReduceTask中进行排序 , 也是最简单的一种 . 该方式的核心代码就是Reduce里面的排序代码

代码实现:

1 . MovieBean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.io.Writable;
/**
 * Writable hadoop 序列化接口
 * @author root
 *
 */
public class MovieBean implements Writable{
    //{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"}
    private String movie;
    private int rate;
    private String timeStamp;
    private String uid;
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(movie);
        out.writeInt(rate);
        out.writeUTF(timeStamp);
        out.writeUTF(uid);
        
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        movie = in.readUTF();
        rate = in.readInt();
        timeStamp = in.readUTF();
        uid = in.readUTF();
    }
    
    
    public String getMovie() {
        return movie;
    }
    public void setMovie(String movie) {
        this.movie = movie;
    }
    public int getRate() {
        return rate;
    }
    public void setRate(int rate) {
        this.rate = rate;
    }
    public String getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(String timeStamp) {
        this.timeStamp = timeStamp;
    }
    public String getUid() {
        return uid;
    }
    public void setUid(String uid) {
        this.uid = uid;
    }
    @Override
    public String toString() {
        return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
    }
    public void set(MovieBean movieBean) {
        this.movie = movieBean.getMovie();
        this.rate = movieBean.getRate();
        this.timeStamp = movieBean.getTimeStamp();
        this.uid = movieBean.getUid();
        
    }
    public void set(String string, int i, String string2, String string3) {
        // TODO Auto-generated method stub
        
    }
 
}

2 . TopN1

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
 
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import com.alibaba.fastjson.JSON;
 
 
public class TopN1 {
    public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
                throws IOException, InterruptedException {
            try {
                MovieBean movieBean = JSON.parseObject(value.toString(), MovieBean.class);
                String movie = movieBean.getMovie();
                context.write(new Text(movie), movieBean);
            } catch (Exception e) {
                
            }
        }
    }
    
    public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
        @Override
        protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
                Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            List<MovieBean> list = new ArrayList<>();
            
            for (MovieBean movieBean : movieBeans) {
                MovieBean movieBean2 = new MovieBean();
                movieBean2.set(movieBean);
                list.add(movieBean2);//
            }
            Collections.sort(list, new Comparator<MovieBean>() {
 
                @Override
                public int compare(MovieBean o1, MovieBean o2) {
                    
                    return o2.getRate() - o1.getRate();
                }
            });
            for (int i = 0; i < Math.min(20, list.size()); i++) {
                context.write(list.get(i), NullWritable.get());
            }
        }
    }
    
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf, "avg");
        
        //设置map和reduce,以及提交的jar
        job.setMapperClass(MapTask.class);
        job.setReducerClass(ReduceTask.class);
        job.setJarByClass(TopN1.class);
        
        //设置输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MovieBean.class);
        
        job.setOutputKeyClass(MovieBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        //输入和输出目录
        FileInputFormat.addInputPath(job, new Path("E:/data/rating.json"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\topN1"));
        
        //判断文件是否存在
        File file = new File("E:\\data\\out\\topN1");
        if(file.exists()){
            FileUtils.deleteDirectory(file);
        }
        
        //提交任务
        boolean completion = job.waitForCompletion(true);
        System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
        
    }
}
 

方式二 : 

该方式利用小顶堆和集合重复原理的方式 , 每过来一个数据 , 跟堆顶数据进行比较 , 如果比最小的大 , 则 =踢掉换新的 , 否则直接跳过数据 . 以此对数据进行排序 . 

代码实现 : 

1 . MovieBean同上

2.TopN2

package hadoop_day06.zhang.movieTopn.N2;
 
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
 
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import com.alibaba.fastjson.JSON;
 
public class TopN2 {
 
    public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
                throws IOException, InterruptedException {
            try {
                MovieBean movieBean = JSON.parseObject(value.toString(), MovieBean.class);
                String movie = movieBean.getMovie();
                context.write(new Text(movie), movieBean);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }
    
    public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
        @Override
        protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
                Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            TreeSet<MovieBean> tree = new TreeSet<>(new Comparator<MovieBean>() {
 
                @Override
                public int compare(MovieBean o1, MovieBean o2) {
                    if (o1.getRate() - o2.getRate() == 0) {
                        return o1.getUid().compareTo(o2.getUid());
                    } else {
                        return o1.getRate() - o2.getRate();
                    }
                }
            });
            for (MovieBean movieBean : movieBeans) {
                MovieBean movieBean2 = new MovieBean();
                movieBean2.set(movieBean);
                if (tree.size() <= 2) {
                    tree.add(movieBean2);
                } else {
                    MovieBean first = tree.first();
                    if(first.getRate() < movieBean2.getRate()) {
                        //做替换
                        tree.remove(first);
                        tree.add(movieBean2);
                    }
                }
            }
            for (MovieBean movieBean : tree) {
                context.write(movieBean, NullWritable.get());
            }
        }
        
    }
    
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf, "topN2");
        
        //设置map和reduce,以及提交的jar
        job.setMapperClass(MapTask.class);
        job.setReducerClass(ReduceTask.class);
        job.setJarByClass(TopN2.class);
        
        //设置输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MovieBean.class);
        
        job.setOutputKeyClass(MovieBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        //输入和输出目录
        FileInputFormat.addInputPath(job, new Path("E:/data/rating.json"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\topN2"));
        
        //判断文件是否存在
        File file = new File("E:\\data\\out\\topN2");
        if(file.exists()){
            FileUtils.deleteDirectory(file);
        }
        
        //提交任务
        boolean completion = job.waitForCompletion(true);
        System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
        
    }
}

运行结果 : 

MovieBean [movie=1, rate=5, timeStamp=968563309, uid=3205]
MovieBean [movie=1, rate=5, timeStamp=966904109, uid=3527]
MovieBean [movie=1, rate=5, timeStamp=959639915, uid=5524]
MovieBean [movie=10, rate=5, timeStamp=976898286, uid=1329]
MovieBean [movie=10, rate=5, timeStamp=971922432, uid=2899]
MovieBean [movie=10, rate=5, timeStamp=975867811, uid=606]
MovieBean [movie=100, rate=5, timeStamp=966291689, uid=3700]
MovieBean [movie=100, rate=5, timeStamp=965387978, uid=4277]
MovieBean [movie=100, rate=5, timeStamp=964115248, uid=4626]
MovieBean [movie=1000, rate=4, timeStamp=968541342, uid=3224]
MovieBean [movie=1000, rate=5, timeStamp=966468743, uid=3644]
MovieBean [movie=1000, rate=5, timeStamp=976240118, uid=474]
MovieBean [movie=1002, rate=5, timeStamp=974833560, uid=1242]
MovieBean [movie=1002, rate=5, timeStamp=965314044, uid=4225]
MovieBean [movie=1002, rate=5, timeStamp=962597546, uid=4981]
MovieBean [movie=1003, rate=5, timeStamp=974698008, uid=1908]
MovieBean [movie=1003, rate=5, timeStamp=965710118, uid=3927]
MovieBean [movie=1003, rate=5, timeStamp=963248386, uid=4745]
MovieBean [movie=1004, rate=5, timeStamp=965581802, uid=3997]
MovieBean [movie=1004, rate=5, timeStamp=962936941, uid=4807]
MovieBean [movie=1004, rate=5, timeStamp=957671549, uid=5862]
MovieBean [movie=1005, rate=5, timeStamp=970346351, uid=3032]
MovieBean [movie=1005, rate=5, timeStamp=964461162, uid=4572]
MovieBean [movie=1005, rate=5, timeStamp=964036670, uid=4633]

方式三 : 


利用MR编程框架中的高级API

要用自定义类型Bean作为map输出的KEY,则需要以下编程接口:

Bean 必须实现  WritableComparable 接口:   write()  readFields()   compareTo()

分区器:必须自定义一个分区器:

XPartitioner extends Partitioner<KEY,VALUE>,核心方法:getPartition(KEY k,VALUE v,int numReduceTasks)

分组比较器:必须自定义一个分组比较器

XGroupingComparator  extends WritableComparator{
    public XGroupingComparator(){
        super(key.class,true);
    }
    compare(WritableComparable a,WritableComparable b){
    
    };
}

代码实现 : 

1 . movieBean

package hadoop_day06.zhang.movieTopn.N3;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.io.WritableComparable;
 
/**
 * Writeable  hadoop的序列化接口
 * 能够排序
 * @author Administrator
 *
 */
 
 
public class MovieBean implements WritableComparable<MovieBean>{
    //{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"}
    private String movie;
    private int rate;
    private String timeStamp;
    private String uid;
 
    @Override
    public int compareTo(MovieBean o) {
        if(o.getMovie().compareTo(this.getMovie())==0){
            return o.getRate() - this.getRate();
        }else{
            return o.getMovie().compareTo(this.getMovie());
        }
        //return 0;
    }
    
    public void set(String movie,int rate,String timeStamp,String uid){
        this.movie = movie;
        this.rate = rate;
        this.timeStamp = timeStamp;
        this.uid = uid;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(movie);
        out.writeInt(rate);
        out.writeUTF(timeStamp);
        out.writeUTF(uid);
        
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        movie = in.readUTF();
        rate = in.readInt();
        timeStamp = in.readUTF();
        uid = in.readUTF();
    }
    
    
    public String getMovie() {
        return movie;
    }
    public void setMovie(String movie) {
        this.movie = movie;
    }
    public int getRate() {
        return rate;
    }
    public void setRate(int rate) {
        this.rate = rate;
    }
    public String getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(String timeStamp) {
        this.timeStamp = timeStamp;
    }
    public String getUid() {
        return uid;
    }
    public void setUid(String uid) {
        this.uid = uid;
    }
    @Override
    public String toString() {
        return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
    }
    public void set(MovieBean movieBean) {
        this.movie = movieBean.getMovie();
        this.rate = movieBean.getRate();
        this.timeStamp = movieBean.getTimeStamp();
        this.uid = movieBean.getUid();
        
    }
 
}
2 . MyPartition

package hadoop_day06.zhang.movieTopn.N3;
 
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
 
/**
 * 分区的:
 * 把想要的数据分到相同的reduce里面
 * @author root
 *
 */
public class MyPartition extends Partitioner<MovieBean, NullWritable>{
    /**
     * munPartitions 代表有多少个reducetask
     * key map端输出的key
     * value map端输出的value
     */
    @Override
    public int getPartition(MovieBean key, NullWritable value, int numPartitions) {
        return (key.getMovie().hashCode() & Integer.MAX_VALUE)%numPartitions;
    }
 
}
3 . MyGroup

package hadoop_day06.zhang.movieTopn.N3;
 
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 分组:movieid相同的分到一起
 * @author root
 *
 */
public class MyGroup extends WritableComparator{
    
    public MyGroup() {
        super(MovieBean.class,true);
    }
    
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MovieBean bean1 = (MovieBean)a;
        MovieBean bean2 = (MovieBean)b;
        return bean1.getMovie().compareTo(bean2.getMovie());
    }
 
}
4 . TopN3

package hadoop_day06.zhang.movieTopn.N3;
 
import java.io.File;
import java.io.IOException;
 
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;
 
 
public class TopN3 {
    public static class MapTask extends Mapper<LongWritable, Text, MovieBean, NullWritable>{
        
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, MovieBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            try {
                ObjectMapper mapper = new ObjectMapper();
                MovieBean bean = mapper.readValue(value.toString(), MovieBean.class);
                context.write(bean, NullWritable.get());
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }
    
    public static class ReduceTask extends Reducer<MovieBean, NullWritable, MovieBean, NullWritable>{
        @SuppressWarnings("unused")
        @Override
        protected void reduce(MovieBean key, Iterable<NullWritable> values,
                Reducer<MovieBean, NullWritable, MovieBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            int num = 0;
            //虽然是一个空的,但是key能够根据迭代进行相应的得到对应空值的结果
            for (NullWritable nullWritable : values) {
                if(num>=2){
                    break;
                }
                num++;
                context.write(key, NullWritable.get());
            }
        }
    }
    
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf, "topn3");
        
        //设置map和reduce,以及提交的jar
        job.setMapperClass(MapTask.class);
        job.setReducerClass(ReduceTask.class);
        job.setJarByClass(TopN3.class);
        //job.setNumReduceTasks(2);
        job.setPartitionerClass(MyPartition.class);
        job.setGroupingComparatorClass(MyGroup.class);
        
        
        //设置输入输出类型
        job.setMapOutputKeyClass(MovieBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        job.setOutputKeyClass(MovieBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        //输入和输出目录
        FileInputFormat.addInputPath(job, new Path("E:/data/rating.json"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\topN3"));
        
        //判断文件是否存在
        File file = new File("E:\\data\\out\\topN3");
        if(file.exists()){
            FileUtils.deleteDirectory(file);
        }
        
        //提交任务
        boolean completion = job.waitForCompletion(true);
        System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
    }
}
运行结果 : 只截取一段

MovieBean [movie=999, rate=5, timeStamp=967983748, uid=3842]
MovieBean [movie=999, rate=5, timeStamp=972348071, uid=5317]
MovieBean [movie=998, rate=5, timeStamp=974731896, uid=1825]
MovieBean [movie=998, rate=5, timeStamp=970350907, uid=3032]
MovieBean [movie=997, rate=5, timeStamp=967397690, uid=3418]
MovieBean [movie=997, rate=5, timeStamp=965242917, uid=4557]
MovieBean [movie=996, rate=5, timeStamp=971903343, uid=2900]
MovieBean [movie=996, rate=5, timeStamp=976490369, uid=306]
MovieBean [movie=994, rate=5, timeStamp=959456139, uid=5553]
MovieBean [movie=994, rate=5, timeStamp=968002575, uid=3305]
MovieBean [movie=993, rate=5, timeStamp=970048313, uid=4277]
MovieBean [movie=993, rate=4, timeStamp=984153990, uid=331]
MovieBean [movie=992, rate=5, timeStamp=963976969, uid=4645]
MovieBean [movie=992, rate=4, timeStamp=966033503, uid=3868]
MovieBean [movie=991, rate=5, timeStamp=975900929, uid=593]
MovieBean [movie=991, rate=5, timeStamp=977243479, uid=1501]
MovieBean [movie=990, rate=4, timeStamp=965294965, uid=4277]
MovieBean [movie=990, rate=4, timeStamp=967071268, uid=3491]
MovieBean [movie=99, rate=5, timeStamp=959938819, uid=5452]
MovieBean [movie=99, rate=5, timeStamp=974762428, uid=1408]
MovieBean [movie=989, rate=5, timeStamp=974693867, uid=1915]
MovieBean [movie=988, rate=5, timeStamp=974738718, uid=1587]
MovieBean [movie=988, rate=5, timeStamp=971769566, uid=2913]

 

代码实现 : 
1 . movieBean
--------------------- 
作者:俊杰梓 
来源:CSDN 
原文:https://blog.csdn.net/weixin_35353187/article/details/82025204 
版权声明:本文为博主原创文章,转载请附上博文链接!

猜你喜欢

转载自blog.csdn.net/qq_43193797/article/details/86367610