同样是<Hadoop in Action> 上面的练习~
练习:
计算两个向量的内积,比如:
v1 = [1 2 3]
v2 = [2 3 4]
内积 = 2 + 5 + 12 = 19
我的输入文件:
1.0 2.0 3.0 4.0 1 1
即:
v1 = [1 3 1]
v2 = [2 4 1]
结果: 15
思路:
每行读取两个向量的两个元素并计算乘积,然后在Reduce之中进行求和。
注意:
如果在main函数之中,设定了setCombiner(Reduce.class) 最后结果会出错,因为和被计算了两次!
即算出来的结果会是30!
代码如下:
package hadoop_in_action_exersice; import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InnerProduct { private static final Text SUM = new Text("sum"); public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> { TreeMap<Integer, String> map = new TreeMap<Integer, String>(); private static double map_sum = 0.0; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] arr = line.split(" "); try { double v1 = Double.parseDouble(arr[0]); double v2 = Double.parseDouble(arr[1]); map_sum += v1 * v2; } catch(Exception e) { e.printStackTrace(); } } @Override protected void cleanup( Mapper<LongWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { System.out.println("!!!" + map_sum); context.write(SUM, new DoubleWritable(map_sum)); } } public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { private static double sum = 0; public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { if(key.toString() .equals(SUM.toString()) ) { for(DoubleWritable v : values) { sum += v.get(); } context.write(key, new DoubleWritable(sum)); } } } public static void main(String[] args) { // TODO Auto-generated method stub Configuration conf = new Configuration(); try { Job job = new Job(conf, "my own word count"); job.setJarByClass(InnerProduct.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); // job.setReducerClass(Reduce.class); // 这里不能调用,否则会多进行一次求和的操作造成结果错误 job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.setInputPaths(job, new Path("/home/hadoop/DataSet/Hadoop/Exercise/InnerProduct")); FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/DataSet/Hadoop/Exercise/InnerProduct-output")); System.out.println(job.waitForCompletion(true)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }