package com.averscore; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Entry { @SuppressWarnings("deprecation") public static void main(String arg[]) throws Exception { Configuration conf = new Configuration(); String[] otherarg = new GenericOptionsParser(conf, arg).getRemainingArgs(); if (otherarg.length != 2) { System.out.println("error!"); System.exit(2); } Job job = new Job(conf, "averscore"); job.setJarByClass(Entry.class); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job, new Path(arg[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } package com.averscore; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ScoreMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable num = new IntWritable(); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String []line=value.toString().split(" "); word.set(line[0]); num.set(Integer.parseInt(line[1])); context.write(word, num); } } package com.averscore; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private static IntWritable averscore = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int cnt = 0; int sum = 0; for (IntWritable val : values) { sum += val.get(); cnt++; } int score=sum/cnt; averscore.set(score); context.write(key, averscore); } }
设计思路
Map输入key值是偏移量,value是字符串。首先按行划分(用参数“\n”),对每一行进行处理,按照空格划分。将姓名和成绩分开,输出的key值是姓名,value值是转换为整数的成绩(Integer.parseInt(String))。