MapReduce统计以某字母开头的单词的平均长度
用MapReduce编写程序主要的就是编写Map和Reduce函数、main函数
java代码如下
package section1;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*内容: 需求:对输入的文本中的单词求平均长度(单词里有多少个字符 就算 多长)
*map输出时,判断单词的首字符是否是字母,如果不是,则不输出。
*partition按照单词的首字母进行分区, 这样就可以得到26个分区
*combine内求每个单词的次数和
*定义26个reduce,reduce0只接收以字母a开头的单词,reduce1只接受以字母b开头的单词
*reduce内求总长和总次数(每个单词的总长=该单词长度*该单词次数),总长=每个单词的总长和,总次数=每个单词的次数和
*最后,reduce输出平均长度=总长/总次数
*这样,得到了26个结果文件,每个文件里只有一个数,即以某字母开头的单词的平均长度
*这样,这4个部分都需要编写不同的代码了
*单词的首字母只按小写计算分区
*
*/
public class mywordcount
{
public static class MapClass extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* 重载了默认的map方法,利用MapClass将每行字符串拆成单词,然后将输出结果<单词,1>写入到OutputCOllector中
* OutputCollector由Hadoop框架提供,负责收集Map和Reduce的输出数据
* 实现Map函数时,只要将输出<key,value>写入OutputCollector即可
* map输入的value即为一行的字符串
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());//以空格作为分隔符
while (itr.hasMoreTokens())
{
String tmp = itr.nextToken();
if(tmp.charAt(0)>='a' && tmp.charAt(0)<='z' || tmp.charAt(0) >='A' && tmp.charAt(0)<='Z')
{
word.set(tmp.toLowerCase());
context.write(word, one);//把处理结果<单词,1>写入
}
}
}
}
/**每一个数值代表一个分区,这样就有26个分区
*
* */
public static class PartitionClass extends Partitioner<Text,IntWritable>
{
public int getPartition(Text key, IntWritable value, int numPartitions)
{
int result = 0;
result = (key.charAt(0) - 'a') % numPartitions;
return result;
}
}
public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
/**
* 重载了默认的Reduce方法,通过key遍历其values<Iterator>, 并加以处理
* 所谓的处理就是将相同key下的词频相加,就可以得到这个单词的总的出现次数
* reduce里key是任务的输出结果KEY,即表示一个单词,value即相同key下的数值
*/
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();//对相同的单词数量进行累加
}
result.set(sum);
context.write(key, result);//写输出结果
}
}
public static class ReduceClass extends Reducer<Text,IntWritable,Text,DoubleWritable>
{
// private DoubleWritable result = new IntWritable();
private int sumLength = 0;
private int sumCount = 0;
// private double avgLength = 0;
private double avgLengthCharacter[] = new double[26];
private int sameFirstCharacterCount[] = new int[26];
private int sameFirstWordLengthCount[] = new int[26];
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int count = 0;
int length = key.getLength();
int index = 0;
for (IntWritable val : values)
{
count++; //对相同的单词数量进行累加
}
index = key.charAt(0) - 'a';
sameFirstCharacterCount[index] = sameFirstCharacterCount[index] + count;
sameFirstWordLengthCount[index] = sameFirstWordLengthCount[index] + length*count;
// sumLength += length * count;
// sumCount += count;
// context.write(key, new DoubleWritable(count));
}
/** 求整个文件中单词的平均长度*/
protected void cleanup(Context context) throws IOException, InterruptedException
{
for (int index = 0;index < 26;index++)
if (sameFirstCharacterCount[index] != 0)
{
avgLengthCharacter[index] = (double)sameFirstWordLengthCount[index]/sameFirstCharacterCount[index];
int outKey = 'a' + index;
char outKeyChar = (char)outKey;
String outKeyString = String.valueOf(outKeyChar);
context.write(new Text(outKeyString), new DoubleWritable(avgLengthCharacter[index]));
}
}
}
/**在 Hadoop 中一次计算任务称之为一个 job,可以通过一个Configuration对象设置如何运行这个job
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();//启用默认设置,Configuration对象封装了客户端活服务器的配置
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");//定义一个作业控制
job.setJarByClass(mywordcount.class);//设置执行类
job.setNumReduceTasks(26);//定义26个reduce
job.setMapperClass(MapClass.class);//指定map类型
job.setCombinerClass(CombinerClass.class);//指定combiner类型
job.setReducerClass(ReduceClass.class);//指定reduce类型
job.setPartitionerClass(PartitionClass.class);//指定partition类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);//设置map函数的输出类型是Text
job.setOutputValueClass(DoubleWritable.class);//设置reduce函数的输出类型是IntWritable
//定义输入数据的路径,通过Debug Configuration的Argument设置,该路径可以是单个文件,也可以是目录,此时将目录下所有文件当作输入或符合特定文件模式的一组文件
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//定义输出数据的路径,通过Debug Configuration的Argument设置,在运行任务前该目录不能存在,否则会报错
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//每次配置Job的最后一步,System.exit(0)代表正常退出,System.exit(1)代表不正常退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}