Java代码
public class WCMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\t");
long sum = 0;
long var = 0;
long count = 0;
// 计算读进来读每一行的数字个数与数字之和
for (String word:words){
int i = Integer.parseInt(word);
sum += i;
var += (i*i);
count ++;
}
String sum0 = Long.toString(sum);
String count0 = Long.toString(count);
String var0 = Long.toString(var);
String a = sum0+" "+count0+" "+var0;
context.write(new Text("1"), new Text(a));
}
}
public class WCReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text word, Iterable<Text> value, Context context) throws IOException, InterruptedException {
Double sum = 0.0;
long count = 0;
BigInteger var = new BigInteger("0");
// 由于关键字都是"1",所以一次reduce能读入所有的数
// 计算所有数字之和与数字的个数之和
for (Text one:value){
String line = one.toString();
String[] words = line.split(" ");
Double i = Double.valueOf(words[0]);
sum += i;
long j = Long.parseLong(words[1]);
count += j;
long m = Long.parseLong(words[2]);
BigInteger k = BigInteger.valueOf(m);
var = var.add(k);
}
// 计算平均数
String avg = String.valueOf (sum/count);
//double var0 = (double) (var/count) - (sum*sum);
context.write(new Text("平均数为:"), new Text(avg));
//context.write(new Text("方差为:"), new DoubleWritable(var0));
}
}
public class WCDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// conf.set("mapreduce.task.io.sort.factor", "100");
Job job = Job.getInstance(conf);
job.setJarByClass(WCDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//job.setCombinerClass(myReducer.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean bool = job.waitForCompletion(true);
System.exit(bool?0:1);
}
}
Python代码:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
# 在map端拿到当前map所有读入数据的数字之和与数字个数
def map():
count = 0
sum = 0
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
sum += int(word)
count += 1
print('%d\t%d' % (sum, count))
if __name__ == '__main__':
map()
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
def reduce():
sum = 0
count = 0
for line in sys.stdin:
sum_count = line.strip().split('\t')
sum += int(sum_count[0])
count += int(sum_count[1])
a = sum/count
print('%s\t%s' % ("平均数:", str(a)))
if __name__ == '__main__':
reduce()