MapReduce写代码的流程,以及需要继承的超类

package tq;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ScanPerformanceEvaluation.MyMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.TestMiniMRClientCluster.MyReducer;
import org.apache.hadoop.mapreduce.Job;

import wordcount.MyCombiner;

public class TianQi {
	public static void main(String[] args) throws IOException {
		//设置配置项
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		//设置
		job.setJarByClass(TianQi.class);
		job.setJobName("sdfjsk");
		
		//设置读取文件的路径
		Path filein = new Path("jk");
		FileInputFormat.addInputPath(job, filein);
		
		
		//设置文件的输出的路径
		Path fileout = new Path("fdsd");
		if(fileout.getFileSystem(conf).exists(fileout)) {
			fileout.getFileSystem(conf).delete(fileout,true);
		}
		FileOutputFormat.setOutputPath(job, fileout);
		
		//设置文件的读入的格式 MyInputFormat extends InputFormat.class
		job.setInputFormatClass(MyInputFormat.class);
//		Multiple markers at this line
//		- The method setInputFormatClass(Class<? extends InputFormat>) in the type Job is not applicable for the arguments 
//		 (Class<MyInputFormat>)
//		- MyInputFormat cannot be resolved to a type
		
		//设置文件读出的格式 MyOutFormat extends OutputFormat.class
		job.setOutputFormatClass(MyOutFormat.class);
//		Multiple markers at this line
//		- The method setOutputFormatClass(Class<? extends OutputFormat>) in the type Job is not applicable for the arguments 
//		 (Class<MyOutFormat>)
//		- MyOutFormat cannot be resolved to a type
		
		//设置map端 mymapper.class extends mapper.class
		job.setMapperClass(MyMapper.class);
		
		//设置map端输出的格式
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable);
		
		//设置comparator排序规则
		job.setSortComparatorClass(MySortComparator.class);
//		Multiple markers at this line
//		- The method setSortComparatorClass(Class<? extends RawComparator>) in the type Job is not applicable for the 
//		 arguments (Class<MySortComparator>)
//		- MySortComparator cannot be resolved to a type
		
		//设置partition分区  Mypartition extends Partitoner
		job.setPartitionerClass(MyPartition.class);
//		Multiple markers at this line
//		- MyPartition cannot be resolved to a type
//		- The method setPartitionerClass(Class<? extends Partitioner>) in the type Job is not applicable for the arguments 
//		 (Class<MyPartition>)
		
		
		//设置map端的预聚合 MyCombiner.class extends Reducer.class
		job.setCombinerClass(MyCombiner.class);
//		The method setCombinerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<MyCombiner>)
		
		//设置
		job.setGroupingComparatorClass(MyGroup.class);
//		Multiple markers at this line
//		- The method setGroupingComparatorClass(Class<? extends RawComparator>) in the type Job is not applicable for the 
//		 arguments (Class<MyGroup>)
//		- MyGroup cannot be resolved to a type
		
		//设置reduce端
		job.setReducerClass(MyReducer.class);
		
		//设置reduce端的输出的key
		job.setOutputKeyClass(Text.class);
		
		//设置reduce端的输出的value
		job.setOutputValueClass(IntWritable);
		
		//设置map端的task的个数
		job.setNumReduceTasks(2);
		
		/**
		 * 总结:
		 * 一、设置conf configuration conf = new configuration
		 * 		Job job = Job.getInstance(conf)
		 * 二、设置文件名和jobName
		 * 		job.getJarbyclass()
		 * 		job.setJobName()
		 * 三、设置文件的输入路径和输出路径
		 * 		FileInputFormat.addInputPATH
		 * 		FileOutputFormat.setoutputpath
		 * 			if(filleout.getfilesystem(conf).exists(fileout))
		 * 				fileout.getfileoutsystem(conf).delete(fileout)
		 * 四、设置文件读入的类型
		 * 		job.setFileInputFormat extends inputFormat.class
		 * 五、设置文件的读出的类型
		 * 		job.setFileOutFormatclass extends outputFormat.class
		 * 
		 * 六、设置文件map端
		 * 		job.setMapperclass extends mapper
		 * 七、设置map端的输出的key
		 * 		job.setmapoutputkeyclass		 		
		 * 八、设置map端的输出的value
		 * 		job.setmapoutputvalueclass
		 * 九、设置排序sort
		 * 		job.setsortComparator() extends RawComparator()
		 * 十、设置排序
		 * 		job.setGroupingComparatorclass extends RawComparator.class
		 * 十一、设置partition
		 * 		job.setpartitionclass extends partitioner
		 * 十二、设置reduce
		 * 		job.setReducerclass extends reducer.class
		 * 十三、设置reduce端的输出的key
		 * 		job.setoutputkeyclass
		 * 十四、设置reduce端的输出的value
		 * 		job.setoutputvalueclass
		 * 十五、设置reduce端task的个数
		 * 		job.setNumoofReduceTask()
		 * 十六、最终设置job.waitforcomplettion(true)
		 * 
		 */		
	}
}

总结:

/**
		     * 总结:
		 * 一、设置conf configuration conf = new configuration
		 * 		Job job = Job.getInstance(conf)
		 * 二、设置文件名和jobName
		 * 		job.getJarbyclass()
		 * 		job.setJobName()
		 * 三、设置文件的输入路径和输出路径
		 * 		FileInputFormat.addInputPATH
		 * 		FileOutputFormat.setoutputpath
		 * 			if(filleout.getfilesystem(conf).exists(fileout))
		 * 				fileout.getfileoutsystem(conf).delete(fileout)
		 * 四、设置文件读入的类型
		 * 		job.setFileInputFormat extends inputFormat.class
		 * 五、设置文件的读出的类型
		 * 		job.setFileOutFormatclass extends outputFormat.class
		 * 
		 * 六、设置文件map端
		 * 		job.setMapperclass extends mapper
		 * 七、设置map端的输出的key
		 * 		job.setmapoutputkeyclass		 		
		 * 八、设置map端的输出的value
		 * 		job.setmapoutputvalueclass
		 * 九、设置排序sort
		 * 		job.setsortComparator() extends RawComparator()
		 * 十、设置排序
		 * 		job.setGroupingComparatorclass extends RawComparator.class
		 * 十一、设置partition
		 * 		job.setpartitionclass extends partitioner
		 * 十二、设置reduce
		 * 		job.setReducerclass extends reducer.class
		 * 十三、设置reduce端的输出的key
		 * 		job.setoutputkeyclass
		 * 十四、设置reduce端的输出的value
		 * 		job.setoutputvalueclass
		 * 十五、设置reduce端task的个数
		 * 		job.setNumoofReduceTask()
		 * 十六、最终设置job.waitforcomplettion(true)
		 * 
		 */		

MapReduce写代码的流程:

分为以下几个类“

一、公共设置(四种):

1、设置conf

 configuration conf = new configuration()

Job job = Job.getInstance(conf);

2、设置类名

job.setJarByclass(tq.class)

job.setJobName("sdfds")

3、设置文件的读入路径和读出路径

Path filein = new Path("sdfs")

FileinputFormat.addInputparh(job, filein)

Path fileout = new Path("dfdjs")

if(fileout.getFilesystem(conf).exists(fileout){
fileout.getfilesystem(conf).delete(fileout)

}

FileOutpuFormat.setOutPath(job,fileout);

4、设置文件的读入格式和读出格式

job.setfileinputformatclass (fddsf) extends inputformat()

job.setfileoutputformat(dfd) extends inputformat.class

二、设置map端

1、设置map端

job.setmapperclass extends mapper

2、设置map端的输出key和value的值

job.setmapoutputkeyclass

job.setmapoutputvalueclass

三、设置map端输出之后

1、设置排序

job.setsortComparatorclass extends RawComparator.class

job.setGroupingComparatorclass extends RawComparator.class

2、设置分区

job.setpartitionerclass extends partitioner.class

3、设置map端的预聚合

job.setcombinerclass extends reducer

四、设置reduce端

1、job.setreducerclass extends Reducer.class

2、设置reduce端的输出

job.setoutputkeyclass()

job.setoutputvalueclass()

五、所有的都结束之后

1、设置reduce端task的个数

job.setNumofReducetask(2)

2、job.waitforcompletition(true)

猜你喜欢

转载自blog.csdn.net/wyqwilliam/article/details/84669576