问题是,HADOOP如何来处理结构化数据,比如大量的XML
答案如下(非常遗憾,貌似只能在旧版本API上使用,即 org.apache.hadoop.mapred):
package com.liangc.hadoop.mr; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.streaming.StreamInputFormat; import org.apache.hadoop.streaming.StreamXmlRecordReader; public class XmlMR { public static class MyMap extends MapReduceBase implements Mapper<Text, Text, Text, Text> { @Override public void map(Text key, Text value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException { System.out.println("map :::::: "+key.toString()); ctx.collect(key, key); } } public static class MyReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException { StringBuffer sb = new StringBuffer(); while(value.hasNext()){ Text v = value.next(); System.out.println("reduce :::::: "+v.toString()); sb.append(v.toString()); } ctx.collect(new Text(key.getLength()+""), new Text(sb.toString())); } } public static void main(String[] args) throws Exception { String input = args[0]; String output = args[1]; String begin = "<property>"; String end = "</property>"; JobConf conf = new JobConf(XmlMR.class); conf.setJobName("xmlMR"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(MyMap.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(StreamInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); MultipleInputs.addInputPath(conf, new Path(input), StreamInputFormat.class,MyMap.class); FileOutputFormat.setOutputPath(conf, new Path(output)); conf.set("stream.recordreader.class", StreamXmlRecordReader.class.getName()); conf.set("stream.recordreader.begin", begin ); conf.set("stream.recordreader.end", end ); JobClient.runJob(conf); } }
我把hadoop的conf目录下的XML上传到HDFS当做测试数据,执行脚本如下(路径替换成自己的):
hadoop jar xmlMR.jar com.liangc.hadoop.mr.XmlMR /user/hadoop/conf/* /user/hadoop/conf/out/13032901
执行结果:
......
300 <property>
<name>mapred.capacity-scheduler.default-init-accept-jobs-factor</name>
<value>10</value>
<description>The default multipe of (maximum-system-jobs * queue-capacity)
used to determine the number of jobs which are accepted by the scheduler.
</description>
</property>
355 <property>
<name>mapred.capacity-scheduler.default-maximum-active-tasks-per-queue</name>
<value>200000</value>
<description>The default maximum number of tasks, across all jobs in the
queue, which can be initialized concurrently. Once the queue's jobs exceed
this limit they will be queued on disk.
</description>
</property>
......
能得到这样的结果我很兴奋,因为只要能得到这样的结果,我在 Mapreduce 中就可以对数据进行各种处理,进而得到想要的各种结果了。