MapRecue实例开发 ------ 编程篇(经典wordcount程序编写)

版权声明:个人原创,转载请标注! https://blog.csdn.net/Z_Date/article/details/83858242

目录

 

1、 编程步骤

2、 经典的wordcount程序编写

编写代码


1、 编程步骤

  1. 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)

  2. Mapper的输入数据是KV对的形式(KV的类型可自定义)

  3. Mapper的输出数据是KV对的形式(KV的类型可自定义)

  4. Mapper中的业务逻辑写在map()方法中

  5. map()方法(maptask进程)对每一个<K,V>调用一次

  6. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

  7. Reducer的业务逻辑写在reduce()方法中

  8. Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法

  9. 用户自定义的Mapper和Reducer都要继承各自的父类

  10. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

2、 经典的wordcount程序编写

  1. 需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词出现次数

    如有三个文件,文件名是qf_course.txt、qf_stu.txt 和 qf_teacher

qf_course.txt内容:

php java linux
bigdata VR
C C++ java web
linux shell

qf_stu.txt内容:

tom jim lucy
lily sally
andy
tom jim sally

qf_teacher内容:

jerry Lucy tom
jim
  1. 方案

    • 分别统计每个文件中单词出现次数 - map()

    • 累加不同文件中同一个单词出现次数 - reduce()

  2. 实现代码

    • 创建一个简单的maven项目

    • 添加hadoop client依赖的jar,pom.xml主要内容如下:

<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.1</version>
		</dependency>
		
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>		
</dependencies>
  • 编写代码

  • 自定义一个mapper类

  • import java.io.IOException;
    
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
    
      /**
       * Maper里面的泛型的四个类型从左到右依次是:
       * 
       * LongWritable KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,  类似于行号但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable 
       * Text VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
       *
       * Text KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
       * IntWritable VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
       */
      public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
      	/**
      	 * map阶段的业务逻辑就写在自定义的map()方法中
      	 * maptask会对每一行输入数据调用一次我们自定义的map()方法
      	 */
      	@Override
      	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      		
      		//将maptask传给我们的一行的文本内容先转换成String
      		String line = value.toString();
      		//根据空格将这一行切分成单词
      		String[] words = line.split(" ");
      	
      		/**
      		 *将单词输出为<单词,1> 
      		 *如<lily,1> <lucy,1>  <c,1> <c++,1> <tom,1> 
      		 */
      		for(String word:words){
      			//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
      			context.write(new Text(word), new IntWritable(1));
      		}
      	}
      }

    自定义一个reduce类

 import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

  /**
   * Reducer里面的泛型的四个类型从左到右依次是:
   * 	Text KEYIN: 对应mapper输出的KEYOUT
   * 	IntWritable VALUEIN: 对应mapper输出的VALUEOUT
   * 
   * 	KEYOUT, 是单词
   * 	VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型,是总次数
   */
  public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

  	/**
  	 * <tom,1>
  	 * <tom,1>
  	 * <linux,1>
  	 * <banana,1>
  	 * <banana,1>
  	 * <banana,1>
  	 * 入参key,是一组相同单词kv对的key
  	 * values是若干相同key的value集合
  	 * 如 <tom,[1,1]>   <linux,[1]>   <banana,[1,1,1]>
  	 */
  	@Override
  	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

  		int count=0;  //累加单词的出现的次数
  		
  		for(IntWritable value:values){
  			count += value.get();
  		}
  		context.write(key, new IntWritable(count));	
  	}	
  }

编写一个Driver类

 import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  /**
   * 相当于一个yarn集群的客户端
   * 需要在此封装我们的mr程序的相关运行参数,指定jar包
   * 最后提交给yarn
   */
  public class WordcountDriver {
  	/**
  	 * 该类是运行在hadoop客户端的,main一运行,yarn客户端就启动起来了,与yarn服务器端通信
  	 * yarn服务器端负责启动mapreduce程序并使用WordcountMapper和WordcountReducer类
  	 */
  	public static void main(String[] args) throws Exception {
  		
  		if (args == null || args.length == 0) {//此代码需要两个输入参数  第一个参数支持要处理的源文件;第二个参数是处理结果的输出路径
  			args = new String[2];
  			args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";//路径都是 hdfs系统的文件路径
  			args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
  		}
  		/**
  		 * 什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取
  		 * /home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
  		 * 文件放入Configuration中
  		 */
  		Configuration conf = new Configuration();
  		Job job = Job.getInstance(conf);
  		
  		//指定本程序的jar包所在的本地路径
  		job.setJarByClass(WordcountDriver.class);
  		
  		//指定本业务job要使用的mapper/Reducer业务类
  		job.setMapperClass(WordcountMapper.class);
  		job.setReducerClass(WordcountReducer.class);
  		
  		//指定mapper输出数据的kv类型
  		job.setMapOutputKeyClass(Text.class);
  		job.setMapOutputValueClass(IntWritable.class);
  		
  		//指定最终输出的数据的kv类型
  		job.setOutputKeyClass(Text.class);
  		job.setOutputValueClass(IntWritable.class);
  		
  		//指定job的输入原始文件所在目录
  		FileInputFormat.setInputPaths(job, new Path(args[0]));
  		//指定job的输出结果所在目录
  		FileOutputFormat.setOutputPath(job, new Path(args[1]));
  		
  		//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
  		/*job.submit();*/
  		boolean res = job.waitForCompletion(true);
  		System.exit(res?0:1);	
  	}
  }

运行此程序的步骤

  1. 将此程序打包 名为wordcount.jar

    第一步

第二步

第三步

2、上传wordcount.jar到名为min1机器的/home/hadoop目录下

3、在hdfs上创建文件夹“/wordcount/input”,并将三个文件(qf_course.txt、qf_stu.txt 和 qf_teacher)上传到hdfs的“/wordcount/input”目录下

hadoop  fs  mkdir  -p  /wordcount/input
hadoop  fs  –put  qf_course.txt  /wordcount/input
hadoop  fs  –put  qf_stu.txt  /wordcount/input
hadoop  fs  –put  qf_teacher.txt  /wordcount/input

4、在/home/hadoop下启动wordcount.jar运行

hadoop jar wordcount.jar 包名.WordcountDriver /wordcount/input  /wordcount/output

5、在hadoop的/wordcount/output下生成两个文件 如下:

​ _SUCCESS //表示计算成功

​ part-r-00000 //处理结果文件

6、查看结果

hadoop fs -cat /wordcount/output/part-r-00000		#结果如下
	Hello 4
	ketty 2
	tom   2
	jim   1
	word  1

猜你喜欢

转载自blog.csdn.net/Z_Date/article/details/83858242