MapReduce-Join中级优化-hadoop自带datajoin的解决方法

接着上一篇 《MapReuce-Join操作-初级优化》 这一篇博客继续说明MapReduce对于Join的操作,这里使用hadoop包中自带的datajoin包来处理,如果是hadoop1.x则包在${HADOOP_HOME}/contrib/datajoin文件夹下。如果是hadoop2.x则该包在${HADOOP_HOME}/share/hadoop/tools/lib下面把包引入工程中就可以使用了。
以下是本篇博客要处理的数据,为了我们前两篇进行用法上的比较,这里使用同样的数据:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,张三,10
7,李四,30
8,王五,20

goodid,name
10,苹果
20,三星
30,LG
40,华为

输出结果:
lee 苹果
张三 苹果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 华为
下面说说datajoin包的基本用法:
首先来看看Map端的写法:
Map端要继承DataJoinMapperBase类
public abstract class DataJoinMapperBase extends JobBase 

并实现以下几个方法:

[java]  view plain  copy
  1. /** 
  2.    * Determine the source tag based on the input file name. 
  3.    *  
  4.    * @param inputFile 
  5.    * @return the source tag computed from the given file name. 
  6.    */  
  7.   protected abstract Text generateInputTag(String inputFile);  
  8.   
  9.   /** 
  10.    * Generate a tagged map output value. The user code can also perform 
  11.    * projection/filtering. If it decides to discard the input record when 
  12.    * certain conditions are met,it can simply return a null. 
  13.    *  
  14.    * @param value 
  15.    * @return an object of TaggedMapOutput computed from the given value. 
  16.    */  
  17.   protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);  
  18.   
  19.   /** 
  20.    * Generate a map output key. The user code can compute the key 
  21.    * programmatically, not just selecting the values of some fields. In this 
  22.    * sense, it is more general than the joining capabilities of SQL. 
  23.    *  
  24.    * @param aRecord 
  25.    * @return the group key for the given record 
  26.    */  
  27.   protected abstract Text generateGroupKey(TaggedMapOutput aRecord);  
下面来看看configure()和map()函数的执行过程:

[java]  view plain  copy
  1. public void configure(JobConf job) {  
  2.     super.configure(job);  
  3.     this.job = job;  
  4.     this.inputFile = job.get(MRJobConfig.MAP_INPUT_FILE);  
  5.     //生成该map的数据的Tag  
  6.     this.inputTag = generateInputTag(this.inputFile);  
  7.   }  
  8.   
  9.   public void map(Object key, Object value,  
  10.                   OutputCollector output, Reporter reporter) throws IOException {  
  11.     if (this.reporter == null) {  
  12.       this.reporter = reporter;  
  13.     }  
  14.     //记录总记录条数  
  15.     addLongValue("totalCount"1);  
  16.     //把原始行记录成生一个TaggedMapOutput的对象  
  17.     TaggedMapOutput aRecord = generateTaggedMapOutput(value);  
  18.     if (aRecord == null) {  
  19.       //记录不合格的字条数  
  20.       addLongValue("discardedCount"1);  
  21.       return;  
  22.     }  
  23.     Text groupKey = generateGroupKey(aRecord);  
  24.     if (groupKey == null) {  
  25.       //记录分组键为空的记录条数  
  26.       addLongValue("nullGroupKeyCount"1);  
  27.       return;  
  28.     }  
  29.     //输出分组键和TaggedMapOutput的对象  
  30.     output.collect(groupKey, aRecord);  
  31.     addLongValue("collectedCount"1);  
  32.   }  
  33.   //主要功能为把map对象中对应的name的计数器加1  
  34.   protected Long addLongValue(Object name, long inc) {  
  35.     Long val = this.longCounters.get(name);  
  36.     Long retv = null;  
  37.     if (val == null) {  
  38.       retv = Long.valueOf(inc);  
  39.     } else {  
  40.       retv = Long.valueOf(val.longValue() + inc);  
  41.     }  
  42.     this.longCounters.put(name, retv);  
  43.     return retv;  
  44.   }  
以上知道了map()中的处理流程过后,
首先我们要创造一个用于传输的实体类,必须继承TaggedMapOutput,下面是参考代码以及注释:

[java]  view plain  copy
  1. import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.io.Writable;  
  7. import org.apache.hadoop.util.ReflectionUtils;  
  8.   
  9. public class TaggedWritable extends TaggedMapOutput {  
  10.     /** 
  11.      * 这样定义报空以下导常: 
  12.      * Error: java.lang.NullPointerException 
  13.          *   at com.seven.mapreduce.join.TaggedWritable.readFields(TaggedWritable.java:32) 
  14.      *   org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106) 
  15.      *   可以参考http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin 
  16.      *   解决。 
  17.      */  
  18.     private Writable data;  
  19.   
  20.     public TaggedWritable() {  
  21.         this.tag = new Text();  
  22.     }  
  23.     public TaggedWritable(Writable data) {   
  24.         this.tag = new Text("");  
  25.         this.data = data;    
  26.     }  
  27.     public void setData(Writable data) {  
  28.         this.data = data;  
  29.     }  
  30.     public void readFields(DataInput arg0) throws IOException {  
  31.         this.tag.readFields(arg0);  
  32.         String dataClz = arg0.readUTF();  
  33.         /** 
  34.          * 根据序列化时传入的类型进行反序列化 
  35.          */  
  36.         if (this.data == null  
  37.                 || !this.data.getClass().getName().equals(dataClz)) {  
  38.             try {  
  39.                 this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);  
  40.             } catch (ClassNotFoundException e) {  
  41.                 e.printStackTrace();  
  42.             }  
  43.         }  
  44.         this.data.readFields(arg0);  
  45.     }  
  46.     public void write(DataOutput arg1) throws IOException {  
  47.         this.tag.write(arg1);  
  48.         /** 
  49.          * 写入类名,反序列化时可以用到 
  50.          */  
  51.         arg1.writeUTF(this.data.getClass().getName());  
  52.         this.data.write(arg1);  
  53.     }  
  54.     @Override  
  55.     public Writable getData() {  
  56.         return data;  
  57.     }  
  58. }  
下面就来编写Map端程序了:

[java]  view plain  copy
  1. import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;  
  2. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  3. import org.apache.hadoop.io.Text;  
  4.   
  5. public class JoinMapper extends DataJoinMapperBase {  
  6.     @Override  
  7.     protected Text generateInputTag(String inputFile) {  
  8.         /** 
  9.          * 生成对应于该Map的Tag 
  10.          */  
  11.         String tagTmp = inputFile.substring(inputFile.lastIndexOf("/") + 1);  
  12.         return new Text(tagTmp);   
  13.     }  
  14.     @Override  
  15.     protected TaggedMapOutput generateTaggedMapOutput(Object value) {  
  16.         TaggedWritable retv = new TaggedWritable((Text) value);  
  17.         /** 
  18.          * 来自父类DataJoinMapperBase的变量,在config()方法中根据文件名初始化 
  19.          */  
  20.         retv.setTag(this.inputTag);    
  21.         return retv;  
  22.     }  
  23.     @Override  
  24.     protected Text generateGroupKey(TaggedMapOutput aRecord) {  
  25.         /** 
  26.          * 生成分组的键,如果是多个文件,但对应的列不同,则在这里根据inputTag来进行 
  27.          * 判断和控制 
  28.          */  
  29.         String line = ((Text) aRecord.getData()).toString();    
  30.         String[] tokens = line.split(",");   
  31.         String groupKey = null;  
  32.         if(this.inputTag.toString().equals("12")){  
  33.             groupKey = tokens[2];  
  34.         } else if (this.inputTag.toString().equals("122")){  
  35.             groupKey = tokens[0];  
  36.         }  
  37.         return new Text(groupKey);   
  38.     }  
  39. }  
下面实现reduce端的代码:这里不过多的介绍DataJoinReducerBase的具体执行过程,下一篇博客会单独的分析这个包的整个执行过程。
这里的代码编写方式是,直接继承DataJoinReducerBase并实现combine()方法就行。
[java]  view plain  copy
  1. import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;  
  2. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  3. import org.apache.hadoop.io.Text;  
  4. public class JoinReducer extends DataJoinReducerBase {  
  5.     /** 
  6.      * combine方法用来筛选掉不需要的组合,获得所需的联结操作(内联结,左联结等)。并且 
  7.          * 将结果化为合适输出格式(如:字段排列,去重等) 
  8.      */  
  9.     @Override  
  10.     protected TaggedMapOutput combine(Object[] tags, Object[] values) {  
  11.         /** 
  12.          * 实现innerjoin的功能 
  13.          */  
  14.     if (tags.length < 2return null;      
  15.         String joinedStr = "";     
  16.         for (int i=0; i<values.length; i++) {    
  17.             if (i > 0) joinedStr += ",";    
  18.             TaggedWritable tw = (TaggedWritable) values[i];  
  19.             String line = ((Text) tw.getData()).toString();  
  20.             String[] tokens = line.split(",");  
  21.         /** 
  22.              * 根据tag的不同,把不同文件中的不同的字段取出进和join操作 
  23.              * 12为用户信息文件名   122为手机信息文件名 
  24.              */  
  25.             if(tw.getTag().equals("12")) {  
  26.                 joinedStr += tokens[1];  
  27.             } else {  
  28.                 joinedStr += tokens[1];  
  29.             }  
  30.         }    
  31.         TaggedWritable retv = new TaggedWritable(new Text(joinedStr));    
  32.         retv.setTag((Text) tags[0]);     
  33.         return retv;    
  34.     }  
  35. }  
启动程序:
[java]  view plain  copy
  1. import org.apache.hadoop.conf.Configuration;  
  2. import org.apache.hadoop.conf.Configured;  
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapred.FileInputFormat;  
  6. import org.apache.hadoop.mapred.FileOutputFormat;  
  7. import org.apache.hadoop.mapred.JobClient;  
  8. import org.apache.hadoop.mapred.JobConf;  
  9. import org.apache.hadoop.mapred.TextInputFormat;  
  10. import org.apache.hadoop.mapred.TextOutputFormat;  
  11. import org.apache.hadoop.util.Tool;  
  12. import org.apache.hadoop.util.ToolRunner;  
  13. public class JobMain extends Configured implements Tool {  
  14.     public int run(String[] args) throws Exception {  
  15.         Configuration conf = getConf();    
  16.         JobConf job = new JobConf(conf, JobMain.class);    
  17.         Path in = new Path(args[0]);    
  18.         Path out = new Path(args[1]);    
  19.         FileInputFormat.setInputPaths(job, in);  
  20.         /** 
  21.          * 设置多个文件夹下面的文件进和JOIN操作 
  22.          */  
  23.         //FileInputFormat.setInputPaths(job, args[0]+ "," + args[1]);  
  24.         FileOutputFormat.setOutputPath(job, out);    
  25.         job.setJobName("DataJoin");    
  26.         job.setMapperClass(JoinMapper.class);    
  27.         job.setReducerClass(JoinReducer.class);    
  28.         job.setInputFormat(TextInputFormat.class);    
  29.         job.setOutputFormat(TextOutputFormat.class);    
  30.         job.setOutputKeyClass(Text.class);    
  31.         job.setOutputValueClass(TaggedWritable.class);    
  32.         job.set("mapred.textoutputformat.separator"",");    
  33.         JobClient.runJob(job);     
  34.         return 0;  
  35.     }  
  36.     public static void main(String[] args) throws Exception {  
  37.         int res = ToolRunner.run(  
  38.                 new Configuration(),    
  39.                 new JobMain(),    
  40.                 args);    
  41.         System.exit(res);   
  42.     }  
  43. }  
运行结果:

用户信息表

手机信息表

运行命令:

 ./hadoop jar mr.jar com.seven.mapreduce.join.JobMain /input/eight /output/night00

运行结果:


总结:

这是hadoop包中自带的join方式的使用,这是一个通用型的JOIN方法,如果熟练了可以快速的开发出JOIN功能,但在执行效率上还有可以提高的空间,下面一篇会说明《hadoop硬实战》中的对这一个功能的优化的实现。

猜你喜欢

转载自blog.csdn.net/qq_35036995/article/details/80473066