现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。
package test;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
publicclass MapperClassextendsMapper<LongWritable,Text,Text,Text>{
publicvoidmap(LongWritable key,Text value,Context context)thorws IOException{
String[] items =value.toString().split(" ");
String k = items[0];
String v = items[1];
context.write(newText(k),new Text(v));
}
}
Reduce类,主要是将键值传到HBase表中
package test;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
publicclass ReducerClassextendsTableReducer<Text,Text,ImmutableBytesWritable>{
publicvoid reduce(Text key,Iterable<Text>values,Context context){
String k = key.toString();
StringBuffer str=null;
for(Text value:values){
str.append(value.toString());
}
String v = new String(str);
Put putrow = new Put(k.getBytes());
putrow.add("fam1".getBytes(),"name".getBytes(), v.getBytes());
}
}
由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable。
Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
publicclass DriverextendsConfiguredimplements Tool{
@Override
public static void run(String[] arg0)throwsException {
// TODO Auto-generated method stub
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.","localhost");
Job job = newJob(conf,"Hbase");
job.setJarByClass(TxtHbase.class);
Path in = newPath(arg0[0]);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, in);
job.setMapperClass(MapperClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class,job);
job.waitForCompletion(true);
}
}
Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 来执行reduce类。
主函数
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
publicclass TxtHbase {
publicstaticvoid main(String [] args)throwsException{
Driver.run(newConfiguration(),new THDriver(),args);
}
}
读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
publicclass MapperClassextendsMapReduceBaseimplements
TableMap<Text, Text> {
staticfinal String NAME = "GetDataFromHbaseTest";
private Configuration conf;
publicvoid map(ImmutableBytesWritable row, Result values,
OutputCollector<Text, Text>output, Reporter reporter)
throwsIOException {
StringBuilder sb = newStringBuilder();
for (Entry<byte[],byte[]> value : values.getFamilyMap(
"fam1".getBytes()).entrySet()) {
String cell =value.getValue().toString();
if (cell !=null){
sb.append(newString(value.getKey())).append(new String(cell));
}
}
output.collect(newText(row.get()),new Text(sb.toString()));
}
要实现这个方法 initTableMapJob(
String table,
String columns,
Class<? extends
TableMap> mapper,
Class<? extendsorg.apache.hadoop.io.WritableComparable> outputKeyClass,
Class<? extendsorg.apache.hadoop.io.Writable> outputValueClass,org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)
。
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
publicclass DriverextendsConfiguredimplements Tool{
@Override
publicstaticvoid run(String[] arg0)throws Exception{
// TODO Auto-generated method stub
Configuration conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.","localhost");
Job job = newJob(conf,"Hbase");
job.setJarByClass(TxtHbase.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtilinitTableMapperJob("table",args0[0],MapperClass.class, job);
job.waitForCompletion(true);}
}
主函数
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
publicclass TxtHbase {
publicstaticvoid main(String [] args)throwsException{
Driver.run(newConfiguration(),new THDriver(),args);
}
}