mapreduce向hbase中写数据

package com.sun.hbase;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 通过mapreduce向hbase写数据
 * 
 * @author asheng
 * 直接运行,"/home/asheng/Hbase/in/"下文本文件,格式为:
1 name1
2 name2
3 name3
4 name4
5 name5
 *运行结果:
 *
 ROW     COLUMN+CELL                                                                                                                    
 1    column=f1:qualifier, timestamp=1373940859569, value=name1                                        
 2    column=f1:qualifier, timestamp=1373940859569, value=name2                                              
 3    column=f1:qualifier, timestamp=1373940859569, value=name3                                         
 4    column=f1:qualifier, timestamp=1373940859569, value=name4                                      
 5    column=f1:qualifier, timestamp=1373940859569, value=name5
 */
public class WriteDataToHBase {
public static class THMapper extends Mapper<LongWritable, Text, Text, Text> 
 {
public void map(LongWritable key, Text value, Context context) 
 {
String[] items = value.toString().split(" ");
String k = items[0];
String v = items[1];
System.out.println("key:" + k + "," + "value:" + v);
try {
context.write(new Text(k), new Text(v));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
 * ImmutableBytesWritable
 * 
 * Put
 */
public static class THReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
    public void reduce(Text key,Iterable<Text> value,Context context){
        String k = key.toString();
        String v = value.iterator().next().toString(); //由数据知道value就只有一行
        Put putrow = new Put(k.getBytes());
        putrow.add("f1".getBytes(), "qualifier".getBytes(), v.getBytes());
        //TODO 列族:列名
        //此处设置列族和列名分别为f1  qualifier  以及value的值v.getBytes()
        try {
                context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
        } catch (IOException e) {
                e.printStackTrace();
        } catch (InterruptedException e) {
                e.printStackTrace();
        }
    }
}
public static class THDriver extends Configured implements Tool{
    @Override
    public int run(String[] arg0) throws Exception 
     {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  
        
        Job job = new Job(conf,"Txt-to-Hbase");
        job.setJarByClass(WriteDataToHBase.class);
        
        Path in = new Path("/home/asheng/Hbase/in/");
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, in);
        
        job.setMapperClass(THMapper.class);
        job.setReducerClass(THReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        TableMapReduceUtil.initTableReducerJob("tab", THReducer.class, job);
        
       job.waitForCompletion(true);
       return 0;
    }
}
public static void main(String [] args) throws Exception
 {
        int mr = ToolRunner.run(new Configuration(),new THDriver(),args);
        System.exit(mr);
  }
}
 

猜你喜欢

转载自oaksun.iteye.com/blog/1942219