内容:
假设一个年级有两个班级,数据分别在class1.csv和class2.csv中,求该年级的数学成绩平均值。数据第一列为学号,第二列为数学成绩。 要求,必须使用Combiner类,且最后输出一行数据,该行仅有一个平均值。
代码实现:
1、Mapper
public class myMapper extends Mapper<LongWritable, Text, NullWritable, FloatWritable> {
FloatWritable v=new FloatWritable();
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
//从输入数据中获取每一个文件中的每一行的值,并且把文本内容转换成String
String line=ivalue.toString();
//对每一行数据进行切分
String[] words=line.split(",");
//业务处理
float score=Float.parseFloat((words[1]));
v.set(score);
context.write(NullWritable.get(), v);
}
}
2、Reduce
public class myReducer extends Reducer<NullWritable, FloatWritable, NullWritable, FloatWritable> {
public void reduce(NullWritable n, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
// process values
float total=0;
int count=0;
//统计文本的数据数量和总和,求平均值
for (FloatWritable val : values) {
count++;
total +=val.get();
}
context.write(NullWritable.get(), new FloatWritable(total/count));
}
}
3、Driver
public class myDriver {
public static void main(String[] args) throws Exception {
//获取配置信息,job对象实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(myReducer.class);
job.setJarByClass(myDriver.class);
//指定业务job要使用mapper/Reduce业务类
// TODO: specify a mapper
job.setMapperClass(myMapper.class);
// TODO: specify a reducer
job.setReducerClass(myReducer.class);
//指定mapper输出数据的K,V类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
//最终输出数据的K,V类型
// TODO: specify output types
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(FloatWritable.class);
Path outpath=new Path("hdfs://192.168.8.129:9000/hdfstest/paixu/output1");
String inpath="hdfs://192.168.8.129:9000/hdfstest/paixu/";
String[] puts=new String[] {inpath+"class1.csv",inpath+"class2.csv"};
Path[] inpaths=new Path[puts.length];
for(int i=0;i<puts.length;i++) {
inpaths[i]=new Path(puts[i]);
}
//指定job的输入原始文件所在目录
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job, inpaths);
FileOutputFormat.setOutputPath(job, outpath);
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
4、结果