MapReduce统计以某字母开头的单词的平均长度

MapReduce统计以某字母开头的单词的平均长度

用MapReduce编写程序主要的就是编写Map和Reduce函数、main函数
java代码如下

package section1;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
 *内容: 需求:对输入的文本中的单词求平均长度(单词里有多少个字符 就算 多长)
 *map输出时,判断单词的首字符是否是字母,如果不是,则不输出。
 *partition按照单词的首字母进行分区, 这样就可以得到26个分区
 *combine内求每个单词的次数和
 *定义26个reduce,reduce0只接收以字母a开头的单词,reduce1只接受以字母b开头的单词
 *reduce内求总长和总次数(每个单词的总长=该单词长度*该单词次数),总长=每个单词的总长和,总次数=每个单词的次数和
 *最后,reduce输出平均长度=总长/总次数
 *这样,得到了26个结果文件,每个文件里只有一个数,即以某字母开头的单词的平均长度
 *这样,这4个部分都需要编写不同的代码了
 *单词的首字母只按小写计算分区
 *
*/
public class mywordcount 
{
	
  public static class MapClass extends Mapper<Object, Text, Text, IntWritable>
  {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    /**
     * 重载了默认的map方法,利用MapClass将每行字符串拆成单词,然后将输出结果<单词,1>写入到OutputCOllector中
     * OutputCollector由Hadoop框架提供,负责收集Map和Reduce的输出数据
     * 实现Map函数时,只要将输出<key,value>写入OutputCollector即可
     * map输入的value即为一行的字符串
     */
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
    {
      StringTokenizer itr = new StringTokenizer(value.toString());//以空格作为分隔符
      while (itr.hasMoreTokens()) 
      {
    	String tmp = itr.nextToken();
    	if(tmp.charAt(0)>='a' && tmp.charAt(0)<='z' || tmp.charAt(0) >='A' && tmp.charAt(0)<='Z')
    	{
    		word.set(tmp.toLowerCase());
            context.write(word, one);//把处理结果<单词,1>写入
    	}
      }
    }
  }
  
  /**每一个数值代表一个分区,这样就有26个分区
   * 
   * */
  public static class PartitionClass extends Partitioner<Text,IntWritable>
  {
	public int getPartition(Text key, IntWritable value, int numPartitions) 
	{
		int result = 0;
		result = (key.charAt(0) - 'a') % numPartitions;
		return result;
	}
  }
  
  public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable> 
  {
    private IntWritable result = new IntWritable();
    /**
     * 重载了默认的Reduce方法,通过key遍历其values<Iterator>, 并加以处理 
     * 所谓的处理就是将相同key下的词频相加,就可以得到这个单词的总的出现次数
     * reduce里key是任务的输出结果KEY,即表示一个单词,value即相同key下的数值
     */
    public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
    {
        int sum = 0;
        for (IntWritable val : values) 
        {        	
            sum += val.get();//对相同的单词数量进行累加
        }
        result.set(sum);
        context.write(key, result);//写输出结果
    }
  }
  
  public static class ReduceClass extends Reducer<Text,IntWritable,Text,DoubleWritable> 
  {
//	  private DoubleWritable result = new IntWritable();
	  private int sumLength = 0;
	  private int sumCount = 0;
//	  private double avgLength = 0;
	  private double avgLengthCharacter[] = new double[26];
	  private int sameFirstCharacterCount[] = new int[26];
	  private int sameFirstWordLengthCount[] = new int[26];



	  
	  public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
	  {
	      int count = 0;
	      int length = key.getLength();
	      int index = 0;
	      
	      for (IntWritable val : values) 
	      {
	       count++; //对相同的单词数量进行累加
	      }
	       index = key.charAt(0) - 'a';
	       sameFirstCharacterCount[index] = sameFirstCharacterCount[index] + count;
	       sameFirstWordLengthCount[index] = sameFirstWordLengthCount[index] + length*count;  
//	      sumLength += length * count;
//	      sumCount += count;
//	      context.write(key, new DoubleWritable(count));
	  }
	  
 /**	 求整个文件中单词的平均长度*/
	  protected void cleanup(Context context) throws IOException, InterruptedException 
	  {
		 for (int index = 0;index < 26;index++)
		  if (sameFirstCharacterCount[index] != 0)
		  {
			  avgLengthCharacter[index] = (double)sameFirstWordLengthCount[index]/sameFirstCharacterCount[index];
			  int outKey = 'a' + index;
			  char outKeyChar = (char)outKey;
			  String outKeyString = String.valueOf(outKeyChar);
     	      context.write(new Text(outKeyString), new DoubleWritable(avgLengthCharacter[index]));
     	   }
		  
	  }  

  }
  /**在 Hadoop 中一次计算任务称之为一个 job,可以通过一个Configuration对象设置如何运行这个job
   * @param args
   * @throws Exception
   */
  public static void main(String[] args) throws Exception 
  {
    Configuration conf = new Configuration();//启用默认设置,Configuration对象封装了客户端活服务器的配置
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) 
    {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    
    Job job = new Job(conf, "word count");//定义一个作业控制
    job.setJarByClass(mywordcount.class);//设置执行类
    job.setNumReduceTasks(26);//定义26个reduce
    
    job.setMapperClass(MapClass.class);//指定map类型
    job.setCombinerClass(CombinerClass.class);//指定combiner类型
    job.setReducerClass(ReduceClass.class);//指定reduce类型
    job.setPartitionerClass(PartitionClass.class);//指定partition类型
    
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    
    job.setOutputKeyClass(Text.class);//设置map函数的输出类型是Text
    job.setOutputValueClass(DoubleWritable.class);//设置reduce函数的输出类型是IntWritable
    
    
    //定义输入数据的路径,通过Debug Configuration的Argument设置,该路径可以是单个文件,也可以是目录,此时将目录下所有文件当作输入或符合特定文件模式的一组文件
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    //定义输出数据的路径,通过Debug Configuration的Argument设置,在运行任务前该目录不能存在,否则会报错
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    //每次配置Job的最后一步,System.exit(0)代表正常退出,System.exit(1)代表不正常退出
        
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


猜你喜欢

转载自blog.csdn.net/qq_39905917/article/details/83187337