第一步:定义我们的mapper类
//bulkLoad 只写一个MAP代码即可
//将HDFS数据转换成HFile
public static class BulkLoaddata extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String rowkey= split[0];
String name= split[1];
String age= split[2];
Put put=new Put(rowkey.getBytes());
put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()),put);
}
}
第二步:开发我们的main程序入口类
@Override
public int run(String[] args) throws Exception {
Configuration conf=new Configuration();
conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("myuser");
Table table = connection.getTable(tableName);
//hhase表的region 的位置信息
RegionLocator regionLocator = connection.getRegionLocator(tableName);
Job job=Job.getInstance(conf, "BulkLoadMap");
job.setJarByClass(BulkLoadMap.class);
//数据的数据类型是文本类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/tmp/user.txt"));
job.setMapperClass(BulkLoaddata.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//数据的输出类型-- HFile类型
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
HFileOutputFormat2.setOutputPath(job, new Path("hdfs://hadoop01:8020/tmp/output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new BulkLoadMap(), args);
}
}