排序器
分组器
分区器,打成K,V,P的形式
buffer缓冲区是环形的,buffer环形缓冲区
环形缓冲区完了的时候,需要手动写一个比较器comporator
之后是combiner聚合
//这里是配置层
Configuration conf=new Configuration(true);
Job job=Job.getInstance();
job.setJarByClass(MyMapReduce.class);
job.setJobName("ooxx");
//设置输入的格式
Path path = new Path("/user/path");
FileInputFormat.addInputPath(job, path);//设置输入的路径
FileInputFormat.setInputDirRecursive(job, true);//使用递归循环
job.setInputFormatClass(sdf.class);//设置输入的格式,默认情况下为text模式
//需要继承inputformat类
//The method setInputFormatClass(Class<? extends InputFormat>)
//in the type Job is not applicable for the arguments (Class<sdf>)
//设置map端的输入
job.setMapperClass(MyMapper.class);
//设置map类方法,需要继承mapper类,并且需要重写里边的map方法
//The method setMapperClass(Class<? extends Mapper>)
//in the type Job is not applicable for the arguments (Class<ss>)
job.setMapOutputKeyClass(TQ.class);
//设置map端key的输出格式,需要实现WritableComparable接口,并且需要实现里边的方法,内部有序列化和反序列化
job.setMapOutputValueClass(IntWritable.class);
//设置map端value的输出格式
//设置map端输出之后的格式
job.setPartitionerClass(MyPartitioner.class);
//设置分区,需要继承Partitioner类
//The method setPartitionerClass(Class<? extends Partitioner>)
//in the type Job is not applicable for the arguments (Class<sdf>)
job.setSortComparatorClass(MyComparator.class);
//设置比较器,需要继承WritableComparator
//需要实现超累
//public MyComparator() {
//super(TQ.class,true);
//}
//The method setSortComparatorClass(Class<? extends RawComparator>)
//in the type Job is not applicable for the arguments (Class<sdf>)
job.setCombinerClass(MyCombiner.class);
//设置combiner,也就是设置map端的预聚合,继承reduce类
//The method setCombinerClass(Class<? extends Reducer>)
//in the type Job is not applicable for the arguments (Class<sdf>)
//设置reduce端的排序,需要继承WritableComparator方法
job.setGroupingComparatorClass(MyGroupCoomparator.class);
//The method setGroupingComparatorClass(Class<? extends RawComparator>)
//in the type Job is not applicable for the arguments (Class<MyGroupCoomparator>)
//设置reduce端
job.setReducerClass(MyReducer.class);
//设置reducer的类,需要继承reducer类
//The method setReducerClass(Class<? extends Reducer>)
//设置reduce端输出的方式
job.setOutputKeyClass(theClass);
//设置reduce端输出的key的格式,这个是根据具体的业务场景来写的
job.setOutputValueClass(theClass);
//设置reduce端输出的value的格式,这个是根据具体的业务场景来写的
//设置最后的输出路径
//如果有类似的文件,那么就删除
Path fileoutput = new Path("/data/napreduce");
if(fileoutput.getFileSystem(conf).exists(fileoutput)) {
fileoutput.getFileSystem(conf).delete(fileoutput, true)
}
FileOutputFormat.setOutputPath(conf, fileoutput);
//等待触发执行
job.waitForCompletion(true);