1 Put
直接将数据导入到Hbase中 , 底层使用RPC请求 , 每次put都会RPC请求,效率并不高
Table tb_dml = conn.getTable(TableName.valueOf("tb_dml"));
Put rk0011 = new Put(Bytes.toBytes("rk0011"));
rk0011.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());
rk0011.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes());
Put rk0010 = new Put(Bytes.toBytes("rk0010"));
rk0010.addColumn("cf1".getBytes(), "age".getBytes(), Bytes.toBytes(23));
rk0010.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());
// 插入多行数据
ArrayList<Put> puts = new ArrayList<>();
puts.add(rk0011);
puts.add(rk0010);
// 插入数据
tb_dml.put(puts);
2 BufferedMutator
使用缓存批次的形式发送RPC请求 , 减少RPC的请求次数 , 插入数据的效率会比put方式效率高, 但是对于大量的静态数据也并不适合!
BufferedMutator bm = conn.getBufferedMutator(TableName.valueOf("tb_dml")); Put rk0011 = new Put(Bytes.toBytes("rk0014")); rk0011.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes()); rk0011.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes()); bm.mutate(rk0011); bm.flush();
3 BulkLoad
用于已经存在一批巨量静态数据的情况!如果不用bulkloader工具,则只能用rpc请求,一条一条地通过rpc提交给regionserver去插入,效率极其低下!
使用 Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Regionserver 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。
Bulk Load 的实现原理是通过一个 MapReduce Job 来生成一个 HBase 的HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
3.1 shell脚本方式
将CSV的文件转换长Hfile文件
将Hflie导入到表中
1,zss,M,34
2,lss,M,33
3,mby,M,29
4,zhoushen,M,24
5,dengzq,F,28
生成Hflie文件
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf:name,cf:gender,cf:age' -Dimporttsv.bulk.output=/csv/output tb_imp_user /csv/input
导入数据到hbase的表中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /csv/output tb_imp_user
查看数据
hbase(main):001:0> scan "tb_imp_user"
ROW COLUMN+CELL
1 column=cf:age, timestamp=1598769675367, value=34
1 column=cf:gender, timestamp=1598769675367, value=M
1 column=cf:name, timestamp=1598769675367, value=zss
2 column=cf:age, timestamp=1598769675367, value=33
2 column=cf:gender, timestamp=1598769675367, value=M
2 column=cf:name, timestamp=1598769675367, value=lss
3 column=cf:age, timestamp=1598769675367, value=29
3 column=cf:gender, timestamp=1598769675367, value=M
3 column=cf:name, timestamp=1598769675367, value=mby
4 column=cf:age, timestamp=1598769675367, value=24
4 column=cf:gender, timestamp=1598769675367, value=M
4 column=cf:name, timestamp=1598769675367, value=zhoushen
5 column=cf:age, timestamp=1598769675367, value=28
5 column=cf:gender, timestamp=1598769675367, value=F
5 column=cf:name, timestamp=1598769675367, value=dengzq
3.2 Java程序方式
使用Mr程序的自定义输入和输出文件类型
- 输入Hfile文件 输出普通的文件 表-->普通文件(数据导出)
- 输入Hfile文件 输出Hfile文件 表-->表
- 输入普通文件 输出Hflie 将普通数据导入到Hbase中
3.2.1 将json数据导入到Hbase表中
/**
* @Author: 多易教育-行哥
* @Date:Create:in 2020/8/30 0030
* @Description:
*/
public class LoadFileData2HbaseTable {
/**
* 处理每行数据 生成rowkey和movieBean
*/
static class LoadFileData2HbaseTableMapper extends Mapper<LongWritable, Text, Text, MovieBean> {
Gson gs = new Gson();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
MovieBean mb = gs.fromJson(line, MovieBean.class);
// 设计rowkey主键
String movie = mb.getMovie();
String timeStamp = mb.getTimeStamp();
String rowkey = StringUtils.leftPad(movie, 5, '0') + "_" + timeStamp;
k.set(rowkey);
context.write(k, mb);
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class LoadFileData2HbaseTableReducer extends TableReducer<Text, MovieBean, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
String rowkey = key.toString();
for (MovieBean mb : values) {
// 获取四个属性
String movie = mb.getMovie();
double rate = mb.getRate();
String timeStamp = mb.getTimeStamp();
String uid = mb.getUid();
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn("info".getBytes(), "movie".getBytes(), Bytes.toBytes(movie));
put.addColumn("info".getBytes(), "rate".getBytes(), Bytes.toBytes(rate));
put.addColumn("info".getBytes(), "timeStamp".getBytes(), Bytes.toBytes(timeStamp));
put.addColumn("info".getBytes(), "uid".getBytes(), Bytes.toBytes(uid));
context.write(null, put);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","doit01:2181,doit02:2181,doit03:2181");
Job job = Job.getInstance(conf);
job.setMapperClass(LoadFileData2HbaseTableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieBean.class);
FileInputFormat.setInputPaths(job,new Path("d:/data/movie/input"));
TableMapReduceUtil.initTableReducerJob("tb_mr_movie" , LoadFileData2HbaseTableReducer.class,job);
job.waitForCompletion(true) ;
}
}