本文针对Hadoop的小白,代码中有详细解释,充分理解每一行,直接上手Hadoop。
首先,我们需要引入三个hadoop_lib包:commons-cli-1.2.jar、hadoop-common-2.6.5.jar、hadoop-mapreduce-client-core-2.6.5.jar
下面附上WordCount代码及注释:
package com.sun.wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /*context是用来传递数据以及其他运行状态信息, * map中的key、value写入context, 让它传递给Reducer进行reduce, * 而reduce进行处理之后数据, 继续写入context,继续交给Hadoop写入hdfs系统。 */ public class WordCount { //继承Mapper接口,设置map的输入类型为<Object, Text>,输出类型为 <Text, IntWritable> public static class WCMapper extends Mapper<Object, Text, Text, IntWritable>{ //one 表示单词出现一次 private final static IntWritable one=new IntWritable(1); //word 用于存储切下的单词 private Text word=new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 对输入的行切词 StringTokenizer itr=new StringTokenizer(value.toString()); //判断是否到尾 while(itr.hasMoreTokens()){ //切下单词存入word word.set(itr.nextToken()); //将分割后的字符串形成键值对,<单词,1>, context.write(word, one); } } } //继承Reducer接口,设置Reducer的输入类型为<Text, IntWritable>,输出类型为 <Text, IntWritable> public static class WCReducer extends Reducer<Text, IntWritable, Text,IntWritable>{ //result 记录单词频数 private IntWritable result=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); //收集结果 context.write(key, result); } } public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); //检查运行命令 //GenericOptionsParser 命令行解析器,它能够辨别一些标准的命令行参数,能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源 String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //如果没有两个参数就执行(退出) if(otherArgs.length != 2){ System.err.println("Usage:wordcount <in> <out>"); //非正常退出 System.exit(2); } //配置作业名 Job job=Job.getInstance(conf,"Test"); job.setNumReduceTasks(1); //配置作业的各个类 job.setJarByClass(WordCount.class);//根据WordCount类的位置打包成Jar文件 job.setMapperClass(WCMapper.class);//设置Mapper //job.setCombinerClass(WCReducer.class); //如果数据量过大,带宽成为瓶颈,可以使用 job.setReducerClass(WCReducer.class);//设置Reducer job.setOutputKeyClass(Text.class);//设置map输出键的类型 job.setOutputValueClass(IntWritable.class);//设置map输出值的类型 //指定MR程序的输入文件和输出文件,后者必须不存在,若存在则报错 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //main 函数等待job执行完成后退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
代码写好后打成jar包,然后将输入文件以及jar包put到Hadoop集群上,运行即可,一定要注意输出文件目录不能存在,否则会报错。