mr生成HFile,导入Hbase表中(适用于空表,数据较大,只有一个列族的表)
代码:
package com.test.bulkload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @ClassName: HFileCreate
* @Description: mr生成HFile,导入Hbase表中(适用于空表,数据较大,只有一个列族的表)
* @see 生成Hfile以后执行: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles outputpath tablename
* @author gy
* @date 2018年7月18日 上午9:18:22
*
*/
public class HFileCreate {
public static class HFileCreateMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] valueStrSplit = value.toString().split("\t");
String word = valueStrSplit[0];
int count = Integer.valueOf(valueStrSplit[1]);
// 创建HBase中的RowKey
byte[] rowKey = Bytes.toBytes(word);
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowKey);
Put put = new Put(rowKey);
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(count));
context.write(rowKeyWritable, put);
}
}
public static void main(String[] args) throws Exception {
String tableNameString = "mr:test2";//表名字
String inputPath = "hdfs://IP地址/input/test.txt";//输入目录
String outputPath = "hdfs://IP地址/output/";//输出目录
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "IP地址");
conf.set("hbase.zookeeper.property.clientPort", "端口号");
conf.set("zookeeper.znode.parent", "/hbase");
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf(tableNameString));
try {
Job job = Job.getInstance(conf, "ExampleRead");
job.setJarByClass(HFileCreate.class);
job.setMapperClass(HFileCreate.HFileCreateMap.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// speculation
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// in/out format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
HFileOutputFormat2.configureIncrementalLoad(job, table, ((HTable) table).getRegionLocator());
try {
job.waitForCompletion(true);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
table.close();
}
if (conn !=null) {
conn.close();
}
}
}
}