package com.sun.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 通过mapreduce向hbase写数据 * * @author asheng * 直接运行,"/home/asheng/Hbase/in/"下文本文件,格式为: 1 name1 2 name2 3 name3 4 name4 5 name5 *运行结果: * ROW COLUMN+CELL 1 column=f1:qualifier, timestamp=1373940859569, value=name1 2 column=f1:qualifier, timestamp=1373940859569, value=name2 3 column=f1:qualifier, timestamp=1373940859569, value=name3 4 column=f1:qualifier, timestamp=1373940859569, value=name4 5 column=f1:qualifier, timestamp=1373940859569, value=name5 */ public class WriteDataToHBase { public static class THMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) { String[] items = value.toString().split(" "); String k = items[0]; String v = items[1]; System.out.println("key:" + k + "," + "value:" + v); try { context.write(new Text(k), new Text(v)); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * ImmutableBytesWritable * * Put */ public static class THReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{ public void reduce(Text key,Iterable<Text> value,Context context){ String k = key.toString(); String v = value.iterator().next().toString(); //由数据知道value就只有一行 Put putrow = new Put(k.getBytes()); putrow.add("f1".getBytes(), "qualifier".getBytes(), v.getBytes()); //TODO 列族:列名 //此处设置列族和列名分别为f1 qualifier 以及value的值v.getBytes() try { context.write(new ImmutableBytesWritable(key.getBytes()), putrow); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class THDriver extends Configured implements Tool{ @Override public int run(String[] arg0) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", "localhost"); Job job = new Job(conf,"Txt-to-Hbase"); job.setJarByClass(WriteDataToHBase.class); Path in = new Path("/home/asheng/Hbase/in/"); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, in); job.setMapperClass(THMapper.class); job.setReducerClass(THReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); TableMapReduceUtil.initTableReducerJob("tab", THReducer.class, job); job.waitForCompletion(true); return 0; } } public static void main(String [] args) throws Exception { int mr = ToolRunner.run(new Configuration(),new THDriver(),args); System.exit(mr); } }
mapreduce向hbase中写数据
猜你喜欢
转载自oaksun.iteye.com/blog/1942219
今日推荐
周排行