DataJoin是Hadoop处理多数据源问题的一个jar包,放在HADOOP_HOME/contrib/文件夹下,使用该框架时,除了需要将jar包导入到工程中,还需要将该jar包导入到每个hadoop集群节点的HADOOP_HOME/lib/包下。
下面我们来看下DataJoin框架式如何处理多数据源的连接的。
为了完成不同数据源的链接,首先,需要为不同数据源下的每个记录定义一个数据源标签(Tag),接着,为了表示每个数据源下的不同记录并且完成连接处理,需要为每个数据记录设置一个主键(GroupKey),然后,DataJoin类库分别在Map阶段和Reduce阶段提供一个处理框架,仅仅留下一些任务有程序员完成。下面是处理过程:
从上述过程可以看到,多数据源的数据会首先被处理成多个数据记录,这些记录是带有标签Tag和主键Group Key的记录,因此使用DataJoin时,我们需要实现generateInputTag(String inputFile)方法和generateTaggedMapOutput(Object value)和generateGroupKey(TaggedMapOutput aRecord)方法,在这个过程中,出现了一个新的类(即带有标签的记录类),因此我们也要实现自定义的记录类。在combine过程中,我们会对笛卡尔积的结果进行整合(这也是为何我们把DataJoin叫做Reduce端连接),因此我们需要实现一个combine(Object[] tags,Object[] values)方法,注意这个combine和MapReduce框架中的combine是两个完全不同的东西,忌混淆。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class DataJoin { public static class DataJoinMapper extends DataJoinMapperBase { public Text generateInputTag(String inputFiles) { return new Text(inputFiles); } public Text generateGroupKey(TaggedMapOutput aRecord) { return new Text(((Text)aRecord.getData()).toString().split(",")[0]); } public TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable ret = new TaggedWritable((Text)value); ret.setTag(this.inputTag); return ret; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(""); this.data = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public void write(DataOutput out) throws IOException { this.tag.write(out); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.data.readFields(in); this.tag.readFields(in); } public Writable getData() { return data; } public void setData(Writable data){ this.data=data; } } public static class DataJoinReducer extends DataJoinReducerBase { @Override public TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) { return null; } StringBuffer joinedStr = new StringBuffer(""); for (int i = 0; i < values.length; i++) { TaggedWritable tw=(TaggedWritable)values[i]; String str=((Text)tw.getData()).toString(); if(i==0) joinedStr.append(str); else joinedStr.append(str.split(",",2)[1]); if(i<values.length-1) joinedStr.append(","); } TaggedWritable ret = new TaggedWritable(new Text(joinedStr.toString())); ret.setTag((Text)tags[0]); return ret; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); JobConf job = new JobConf(conf); job.setJarByClass(DataJoin.class); Path in = new Path(args[0]); FileInputFormat.addInputPath(job, in); Path out = new Path(args[1]); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(DataJoinMapper.class); job.setReducerClass(DataJoinReducer.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); //设置输出文本中key与value之间的符号,默认为制表符Tab job.set("mapred.textoutputformat.separator","="); JobClient.runJob(job); } }