package example2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; //Administrator public class ImportFromFileExample { public static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable,Text >{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new ImmutableBytesWritable(Bytes.toBytes(key.get())), value); } } public static class Reducer1 extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { private byte[] family=null; private byte[]qualifier=null; @Override protected void setup(Context context) throws IOException, InterruptedException { String column=context.getConfiguration().get("conf.column"); byte[][]colkey=KeyValue.parseColumn(Bytes.toBytes(column)); family=colkey[0]; if(colkey.length>1){ qualifier=colkey[1]; } } public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueCon=null; for(Text text:values){ valueCon+=text.toString(); } Put put = new Put(key.get()); put.add(family, qualifier, Bytes.toBytes(valueCon)); context.write(key, put); } } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf=HBaseConfiguration.create(); String []argArray=new GenericOptionsParser(conf, args).getRemainingArgs(); if(argArray.length!=1){ System.exit(1); } conf.set("conf.column", "family1:text"); Job job=new Job(conf,"import from hdfs to hbase"); job.setJarByClass(ImportFromFileExample.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "testtable"); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); TableMapReduceUtil.initTableReducerJob("testtable", Reducer1.class, job); FileInputFormat.addInputPaths(job, argArray[0]); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
从hadoop取出文件写入hbase表中
猜你喜欢
转载自liyonghui160com.iteye.com/blog/2176640
今日推荐
周排行