MapReduce,组合式,迭代式,链式(转载) MapReduce,组合式,迭代式,链式(转载)

MapReduce,组合式,迭代式,链式

 

前面介绍一些怎样用户类制定自己的类,来达到减少中间数据:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499498.html

1.迭代式mapreduce

    一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。有兴趣的可以参考一下mahout的源码。

     在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。如代码所以:

Java代码   收藏代码
  1. Configuration conf1 = new Configuration();  
  2. Job job1 = new Job(conf1,"job1");  
  3. .....  
  4. FileInputFormat.addInputPath(job1,InputPaht1);  
  5. FileOutputFromat.setOoutputPath(job1,Outpath1);  
  6. job1.waitForCompletion(true);  
  7. //sub Mapreduce  
  8. Configuration conf2 = new Configuration();  
  9. Job job2 = new Job(conf1,"job1");  
  10. .....  
  11. FileInputFormat.addInputPath(job2,Outpath1);  
  12. FileOutputFromat.setOoutputPath(job2,Outpath2);  
  13. job2.waitForCompletion(true);  
  14. //sub Mapreduce  
  15. Configuration conf3 = new Configuration();  
  16. Job job3 = new Job(conf1,"job1");  
  17. .....  
  18. FileInputFormat.addInputPath(job3,Outpath2);  
  19. FileOutputFromat.setOoutputPath(job3,Outpath3);  
  20. job3.waitForCompletion(true);  
  21. .....  

 

下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。

但个人感觉现在的mapreduce迭代设计不太满意的地方。

1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。

2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。

好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。

期待着下个版本hadoop更好的支持迭代算法。

Java代码   收藏代码
  1. //main function  
  2. while (!converged && iteration <= maxIterations) {  
  3.       log.info("K-Means Iteration {}", iteration);  
  4.       // point the output to a new directory per iteration  
  5.       Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);  
  6.       converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);  
  7.       // now point the input to the old output directory  
  8.       clustersIn = clustersOut;  
  9.       iteration++;  
  10. }  
  11.   
  12.   private static boolean runIteration(Configuration conf,  
  13.                                       Path input,  
  14.                                       Path clustersIn,  
  15.                                       Path clustersOut,  
  16.                                       String measureClass,  
  17.                                       String convergenceDelta)  
  18.     throws IOException, InterruptedException, ClassNotFoundException {  
  19.   
  20.     conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());  
  21.     conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);  
  22.     conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);  
  23.   
  24.     Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);  
  25.     job.setMapOutputKeyClass(Text.class);  
  26.     job.setMapOutputValueClass(ClusterObservations.class);  
  27.     job.setOutputKeyClass(Text.class);  
  28.     job.setOutputValueClass(Cluster.class);  
  29.   
  30.     job.setInputFormatClass(SequenceFileInputFormat.class);  
  31.     job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  32.     job.setMapperClass(KMeansMapper.class);  
  33.     job.setCombinerClass(KMeansCombiner.class);  
  34.     job.setReducerClass(KMeansReducer.class);  
  35.   
  36.     FileInputFormat.addInputPath(job, input);  
  37.     FileOutputFormat.setOutputPath(job, clustersOut);  
  38.   
  39.     job.setJarByClass(KMeansDriver.class);  
  40.     HadoopUtil.delete(conf, clustersOut);  
  41.     if (!job.waitForCompletion(true)) {  
  42.       throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);  
  43.     }  
  44.     FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);  
  45.   
  46.     return isConverged(clustersOut, conf, fs);  
  47.   }  

 

2.依赖关系组合式MapReduce

我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,开启JobControl的线程即可运行程序。

要注意的地方就是hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。

下面给出伪代码:

Java代码   收藏代码
  1. Configuration job1conf = new Configuration();  
  2. Job job1 = new Job(job1conf,"Job1");  
  3. .........//job1 其他设置  
  4. Configuration job2conf = new Configuration();  
  5. Job job2 = new Job(job2conf,"Job2");  
  6. .........//job2 其他设置  
  7. Configuration job3conf = new Configuration();  
  8. Job job3 = new Job(job3conf,"Job3");  
  9. .........//job3 其他设置  
  10. job3.addDepending(job1);//设置job3和job1的依赖关系  
  11. job3.addDepending(job2);  
  12. JobControl JC = new JobControl("123");  
  13. JC.addJob(job1);//把三个job加入到jobcontorl中  
  14. JC.addJob(job2);  
  15. JC.addJob(job3);  
  16. Thread jcThread = new Thread(JC);  
  17. jcThread.start();  

 

3.链式MapReduce

首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。

一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:

Java代码   收藏代码
  1. ChainMapper.addMapper(...);  
  2.     ChainReducer.addMapper(...);  
  3.     //addMapper()调用的方法形式如下:  
  4.     public static void addMapper(JOb job,  
  5.             Class<? extends Mapper> mclass,  
  6.             Class<?> inputKeyClass,  
  7.             Class<?> inputValueClass,  
  8.             Class<?> outputKeyClass,  
  9.             Class<?> outputValueClass,  
  10.             Configuration conf  
  11.     ){  
  12.     }  

 

其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。

note:这些Mapper和Reducer之间传递的键和值都必须保持一致。

下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法

Java代码   收藏代码
  1. public void function throws IOException {  
  2.         Configuration conf = new Configuration();  
  3.         Job job = new Job(conf);  
  4.         job.setJobName("ChianJOb");  
  5.         // 在ChainMapper里面添加Map1  
  6.         Configuration map1conf = new Configuration(false);  
  7.         ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,  
  8.                 Text.class, Text.classtrue, map1conf);  
  9.         // 在ChainReduce中加入Reducer,Map2;  
  10.         Configuration reduceConf = new Configuration(false);  
  11.         ChainReducer.setReducer(job, Reduce.class, LongWritable.class,  
  12.                 Text.class, Text.class, Text.classtrue, map1conf);  
  13.         Configuration map2Conf = new Configuration();  
  14.         ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,  
  15.                 Text.class, Text.classtrue, map1conf);  
  16.         job.waitForCompletion(true);  
  17.     }  

 

MapReduce,组合式,迭代式,链式

 

前面介绍一些怎样用户类制定自己的类,来达到减少中间数据:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499498.html

1.迭代式mapreduce

    一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。有兴趣的可以参考一下mahout的源码。

     在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。如代码所以:

Java代码   收藏代码
  1. Configuration conf1 = new Configuration();  
  2. Job job1 = new Job(conf1,"job1");  
  3. .....  
  4. FileInputFormat.addInputPath(job1,InputPaht1);  
  5. FileOutputFromat.setOoutputPath(job1,Outpath1);  
  6. job1.waitForCompletion(true);  
  7. //sub Mapreduce  
  8. Configuration conf2 = new Configuration();  
  9. Job job2 = new Job(conf1,"job1");  
  10. .....  
  11. FileInputFormat.addInputPath(job2,Outpath1);  
  12. FileOutputFromat.setOoutputPath(job2,Outpath2);  
  13. job2.waitForCompletion(true);  
  14. //sub Mapreduce  
  15. Configuration conf3 = new Configuration();  
  16. Job job3 = new Job(conf1,"job1");  
  17. .....  
  18. FileInputFormat.addInputPath(job3,Outpath2);  
  19. FileOutputFromat.setOoutputPath(job3,Outpath3);  
  20. job3.waitForCompletion(true);  
  21. .....  

 

下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。

但个人感觉现在的mapreduce迭代设计不太满意的地方。

1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。

2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。

好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。

期待着下个版本hadoop更好的支持迭代算法。

Java代码   收藏代码
  1. //main function  
  2. while (!converged && iteration <= maxIterations) {  
  3.       log.info("K-Means Iteration {}", iteration);  
  4.       // point the output to a new directory per iteration  
  5.       Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);  
  6.       converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);  
  7.       // now point the input to the old output directory  
  8.       clustersIn = clustersOut;  
  9.       iteration++;  
  10. }  
  11.   
  12.   private static boolean runIteration(Configuration conf,  
  13.                                       Path input,  
  14.                                       Path clustersIn,  
  15.                                       Path clustersOut,  
  16.                                       String measureClass,  
  17.                                       String convergenceDelta)  
  18.     throws IOException, InterruptedException, ClassNotFoundException {  
  19.   
  20.     conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());  
  21.     conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);  
  22.     conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);  
  23.   
  24.     Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);  
  25.     job.setMapOutputKeyClass(Text.class);  
  26.     job.setMapOutputValueClass(ClusterObservations.class);  
  27.     job.setOutputKeyClass(Text.class);  
  28.     job.setOutputValueClass(Cluster.class);  
  29.   
  30.     job.setInputFormatClass(SequenceFileInputFormat.class);  
  31.     job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  32.     job.setMapperClass(KMeansMapper.class);  
  33.     job.setCombinerClass(KMeansCombiner.class);  
  34.     job.setReducerClass(KMeansReducer.class);  
  35.   
  36.     FileInputFormat.addInputPath(job, input);  
  37.     FileOutputFormat.setOutputPath(job, clustersOut);  
  38.   
  39.     job.setJarByClass(KMeansDriver.class);  
  40.     HadoopUtil.delete(conf, clustersOut);  
  41.     if (!job.waitForCompletion(true)) {  
  42.       throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);  
  43.     }  
  44.     FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);  
  45.   
  46.     return isConverged(clustersOut, conf, fs);  
  47.   }  

 

2.依赖关系组合式MapReduce

我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,开启JobControl的线程即可运行程序。

要注意的地方就是hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。

下面给出伪代码:

Java代码   收藏代码
  1. Configuration job1conf = new Configuration();  
  2. Job job1 = new Job(job1conf,"Job1");  
  3. .........//job1 其他设置  
  4. Configuration job2conf = new Configuration();  
  5. Job job2 = new Job(job2conf,"Job2");  
  6. .........//job2 其他设置  
  7. Configuration job3conf = new Configuration();  
  8. Job job3 = new Job(job3conf,"Job3");  
  9. .........//job3 其他设置  
  10. job3.addDepending(job1);//设置job3和job1的依赖关系  
  11. job3.addDepending(job2);  
  12. JobControl JC = new JobControl("123");  
  13. JC.addJob(job1);//把三个job加入到jobcontorl中  
  14. JC.addJob(job2);  
  15. JC.addJob(job3);  
  16. Thread jcThread = new Thread(JC);  
  17. jcThread.start();  

 

3.链式MapReduce

首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。

一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:

Java代码   收藏代码
  1. ChainMapper.addMapper(...);  
  2.     ChainReducer.addMapper(...);  
  3.     //addMapper()调用的方法形式如下:  
  4.     public static void addMapper(JOb job,  
  5.             Class<? extends Mapper> mclass,  
  6.             Class<?> inputKeyClass,  
  7.             Class<?> inputValueClass,  
  8.             Class<?> outputKeyClass,  
  9.             Class<?> outputValueClass,  
  10.             Configuration conf  
  11.     ){  
  12.     }  

 

其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。

note:这些Mapper和Reducer之间传递的键和值都必须保持一致。

下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法

Java代码   收藏代码
  1. public void function throws IOException {  
  2.         Configuration conf = new Configuration();  
  3.         Job job = new Job(conf);  
  4.         job.setJobName("ChianJOb");  
  5.         // 在ChainMapper里面添加Map1  
  6.         Configuration map1conf = new Configuration(false);  
  7.         ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,  
  8.                 Text.class, Text.classtrue, map1conf);  
  9.         // 在ChainReduce中加入Reducer,Map2;  
  10.         Configuration reduceConf = new Configuration(false);  
  11.         ChainReducer.setReducer(job, Reduce.class, LongWritable.class,  
  12.                 Text.class, Text.class, Text.classtrue, map1conf);  
  13.         Configuration map2Conf = new Configuration();  
  14.         ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,  
  15.                 Text.class, Text.classtrue, map1conf);  
  16.         job.waitForCompletion(true);  
  17.     }  

 

猜你喜欢

转载自wangjin161.iteye.com/blog/2079580