将我们hdfs上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser2这张表里面去

第一步:定义我们的mapper类

 //bulkLoad  只写一个MAP代码即可
    //将HDFS数据转换成HFile
    public static class BulkLoaddata extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            String rowkey= split[0];
            String name= split[1];
            String age= split[2];
            Put put=new Put(rowkey.getBytes());
            put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes());
            context.write(new ImmutableBytesWritable(rowkey.getBytes()),put);
        }
    }

第二步:开发我们的main程序入口类

@Override
    public int run(String[] args) throws Exception {
        Configuration conf=new Configuration();
        conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
        Connection connection = ConnectionFactory.createConnection(conf);
       TableName tableName = TableName.valueOf("myuser");
        Table table = connection.getTable(tableName);
        //hhase表的region 的位置信息
        RegionLocator regionLocator = connection.getRegionLocator(tableName);

        Job job=Job.getInstance(conf, "BulkLoadMap");
        job.setJarByClass(BulkLoadMap.class);
        //数据的数据类型是文本类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/tmp/user.txt"));
        job.setMapperClass(BulkLoaddata.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);


        //数据的输出类型--   HFile类型
        job.setOutputFormatClass(HFileOutputFormat2.class);

        HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);

        HFileOutputFormat2.setOutputPath(job, new Path("hdfs://hadoop01:8020/tmp/output"));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new BulkLoadMap(), args);
    }
}

发布了93 篇原创文章 · 获赞 288 · 访问量 18万+

猜你喜欢

转载自blog.csdn.net/qq_45765882/article/details/103706029