API调用
工作中更常用的绝对是通过HBase的API来调用实现类似HBase shell
的操作。
环境准备
IDEA + Maven + HBase
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sowhat.demo</groupId>
<artifactId>hbasetest</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 属性的形式指定JDK版本 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<!-- 添加若干依赖-->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!-- 主要是定制化打包
https://www.cnblogs.com/fnlingnzb-learner/p/10537228.html-->
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.sowhat.demo.TestHBase</mainClass>
</manifest>
<!-- 指定main方法所在文件 -->
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
<!--将依赖的第三方jar包打包到jar中,这样方便我们发布可执行的jar包。 -->
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!--名字任意 -->
<phase>package</phase>
<!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal>
<!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Commons
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
private static Connection connection; // 创建连接
private static Admin admin; // admin 对象
static
{
try
{
//创建配置信息 其中ZK的配置一般要跟 HBase/conf/hbase-site.xml里 hbase.zookeeper.quorum 配置一样
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
//创建连接
connection = ConnectionFactory.createConnection(configuration);
//创建Admin对象
admin = connection.getAdmin();
} catch (IOException e)
{
e.printStackTrace();
}
}
public static void close() // 用完后合理关闭
{
try
{
if (admin != null)
{
admin.close();
}
} catch (IOException e)
{
e.printStackTrace();
}
try
{
if (connection != null)
{
connection.close();
}
} catch (IOException e)
{
e.printStackTrace();
}
}
API操作HBase
1. 判断表是否存在
//1.判断表是否存在 老版本方法,已经弃用
public static boolean isTableExistOld(String tableName) throws IOException
{
//创建配置信息
HBaseConfiguration configuration = new HBaseConfiguration();
//给配置信息添加参数
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
//创建HBase客户端
HBaseAdmin admin = new HBaseAdmin(configuration);
//执行判断
boolean exists = admin.tableExists(tableName);
//关闭连接
admin.close();
return exists;
}
//1.判断表是否存在
public static boolean isTableExistNew(String tableName) throws IOException
{
//执行判断
return admin.tableExists(TableName.valueOf(tableName));
}
2. 创建表
创建表的时候必须制定起码一个列族。
//2.创建表 多个列族
public static void createTable(String tableName, String... cfs) throws IOException
{
//判断列族个数
if (cfs.length < 1)
{
System.out.println("请设置正确的列族信息!!!");
return;
}
//判读表是否存在
if (isTableExistNew(tableName))
{
System.out.println("表" + tableName + "已存在!!!");
return;
}
//创建表描述器
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
//循环添加列族信息
for (String cf : cfs)
{
//创建列族描述器
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
//添加列族信息
hTableDescriptor.addFamily(hColumnDescriptor);
}
//创建表
admin.createTable(hTableDescriptor);
}
3. 删除表
//3.删除表
public static void dropTable(String tableName) throws IOException
{
//判断表是否存在
if (!isTableExistNew(tableName))
{
System.out.println("表" + tableName + "不存在!!!");
return;
}
//使表下线
admin.disableTable(TableName.valueOf(tableName));
//删除表操作
admin.deleteTable(TableName.valueOf(tableName));
}
4. 创建namespace
//4.创建命名空间
public static void createNS(String nameSpace) throws IOException
{
//创建命名空间描述器 addConfiguration可选择性添加,
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).addConfiguration("createTime", System.currentTimeMillis() + "").build();
//创建命名空间
try
{
admin.createNamespace(namespaceDescriptor);
} catch (NamespaceExistException e)
{
System.out.println("命名空间" + nameSpace + "已存在!!!");
} catch (IOException e)
{
e.printStackTrace();
}
}
5. 删除namespace
//5.删除命名空间
public static void deleteNS(String nameSpace)
{
try
{
//执行删除操作
admin.deleteNamespace(nameSpace);
} catch (IOException e)
{
e.printStackTrace();
}
}
6. 插入数据
一个RowKey对应一个Put,可创建
//6.插入数据:put "stu","1001","info:name","qiangquan"
public static void putData(String tableName, String rowKey, String cf, String cn, String value) throws IOException
{
//获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//创建Put对象 一个RowKey 对应一个 Put,可以构建多个Put来配合多个RowKey,
Put put = new Put(Bytes.toBytes(rowKey));
//给Put对象添加数据 可插入 1~N条数据
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("sex"), Bytes.toBytes("male"));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("addr"), Bytes.toBytes("SZ"));
//插入数据 支持table.put(List<put>)
table.put(put);
//关闭表连接
table.close();
}
7. get查询
get 'stu','1001'
get 'stu','1001','info'
get 'stu','1001','info:name'
根据上面的查询不同传入不同参数,选择性配置即可,一个Get,
//7.查询数据(get):get "stu","1001","info:name"
public static void getData(String tableName, String rowKey, String cf, String cn) throws IOException
{
//1. 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//2. 创建Get对象 跟Put对应
Get get = new Get(Bytes.toBytes(rowKey));
//2.1 指定查询的列族 get 'stu','1001','info' 这个跟下面的根据参数不同选择性使用哦
get.addFamily(Bytes.toBytes(cf));
//2.2 指定查询的列族 get 'stu','1001','info:name'
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//2.3 设置获取数据的最大版本数
get.setMaxVersions();
//3. 执行查询
Result result = table.get(get);
//4. 解析result get 'sut','1002' 这样的格式 解析一行的若干数据
for (Cell cell : result.rawCells())
{
System.out.println("RowKey: " + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value: " + Bytes.toString(CellUtil.cloneValue(cell)) +
",TimeStamp:" + cell.getTimestamp());
}
//
//关闭连接
table.close();
}
当然也可以同时查询多个rowKeys
//8.获取数据(get),批量获取 若干 rowKeys
public static void getData(String tableName, String... rowKeys) throws IOException
{
//获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//创建存放Get对象的集合
ArrayList<Get> gets = new ArrayList<>();
//循环创建Get对象
for (String rowKey : rowKeys)
{
//创建Get对象
Get get = new Get(Bytes.toBytes(rowKey));
//将Get对象放入gets
gets.add(get);
}
//获取数据结果
Result[] results = table.get(gets);
//解析results
for (Result result : results)
{
//解析result
for (Cell cell : result.rawCells())
{
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//关闭连接
table.close();
}
8. scan 获取数据
scan tableName
//9.扫描表数据(scan)
public static void scanTable(String tableName) throws IOException
{
//获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//创建一个Scan对象 还可以传入 startRow endRow 还可以添加filter
Scan scan = new Scan();
// Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1002"));
//扫描表获取数据
ResultScanner resultScanner = table.getScanner(scan);
//遍历resultScanner
for (Result result : resultScanner)
{
//解析result
for (Cell cell : result.rawCells())
{
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//关闭连接
table.close();
}
9. 删除数据
delete rowkey 删除的ColumnFamily 删除多个版本。
delete rowkey cf 删除的ColumnFamily 删除多个版本。
delete rowkey cf cn {column,columns}
重点
: 跟踪HBase的删除源码你会发现最终还是进入到了添加数据到业务逻辑代码中,无非就是 以前我们put数据到时候是value,现在变成了type = Column 跟ColumnFamily而已。
//10.删除数据
public static void deleteData(String tableName, String rowKey, String cf, String cn) throws IOException
{
//1. 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
//2. 创建一个Delete对象 delete 跟deleteall 都调用此窗口 deleteall 'stu','1001'
Delete delete = new Delete(Bytes.toBytes(rowKey));
//2.1 指定删除数据的列族和列,默认删除最新时间戳的数据,带时间戳就是删除 指定列指定版本
// 如果插入了 name=old,name=new 但是么有flush,则删除new 后还可以查询到old,
// 如果 name=old,name=new flush后 删除new 就查询不到old了,慎用 这个方法。
delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),123);
//2.2 不带时间戳则删除所有版本,带时间戳则删除所有小于时间戳的版本
delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
//指定删除数据的列族 delete 'stu','1001','info'
delete.addFamily(Bytes.toBytes(cf));
//执行删除数据操作
table.delete(delete);
//关闭连接
table.close();
}
// 删除多行数据
public static void deleteMultiRow(String tableName, String... rows) throws IOException{
HTable hTable = new HTable(conf, tableName);
List<Delete> deleteList = new ArrayList<Delete>();
for(String row : rows){
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
10 . main
public static void main(String[] args) throws IOException
{
//判断表是否存在(旧)
System.out.println(isTableExistOld("dddas"));
//判断表是否存在(新)
System.out.println(isTableExistNew("bbb"));
//创建表
createTable("fruit", "info");
//删除表
dropTable("bigdata");
//创建命名空间
createNS("aaa");
//删除命名空间
deleteNS("bigdata");
//插入数据
putData("aaa", "1006", "info", "name", "xinxin");
//获取一行数据
getData("aaa", "1001", "1002", "1003", "1005", "1006");
//扫描全表
scanTable("fruit");
//删除数据
deleteData("aaa", "1006", "info2", "name");
close();
}
可在本地执行,也可以打包后在可访问HBase服务的集群上jar方式执行。
MapReduce操作HBase
HBase就把他当做大数据中的数据库即可,然后任何可以分析HBase的引擎比如MR,Hive,spark链接上HBase都可以实现控制。
接下来吧MR需要操作HBase的若干Jar放到MR中。
- 查看MR操作HBase数据需要那些Jar包
$ bin/hbase mapredcp
---显示出如果用MR来调用Hbase依赖的若干jar
/usr/local/src/hbase/hbase-2.2.2/lib/shaded-clients/hbase-shaded-mapreduce-....
- 环境变量的导入
- 执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export HBASE_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH= `${HBASE_HOME}/bin/hbase mapredcp` // 这里是将变量赋值给mapredcp
- 永久生效:在/etc/profile 配置
export HBASE_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop-2.7.2
并在 Hadoop里的etc里面找到 hadoop-env.sh 中配置:(注意:在 for 循环之后配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
然后重启Hadoop跟HBase
bin/stop-hbase.sh
hadoop 全部关闭
上述配置后文件全部分`发到所以节点`,
然后重启Hadoop跟Hbase
1. 官方案例
实现 MR跟 HBase 之间交互数据。
案例一: 读取HBase数据到MR中
统计 Student 表中有多少行数据,在HBase目录下执行如下代码,。
/usr/local/src/hadoop/hadoop-3.1.3/bin/yarn jar /usr/local/src/hbase/hbase-2.2.2/lib/hbase-server-2.2.2.jar rowcounter sowhat
---- 跟在hbase shell 执行 count 'sowhat'一样
案例二:使用 MapReduce 将本地数据导入到 HBase
- 在本地创建一个tsv格式的文件:fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
- 创建HBase表
hbase(main):001:0> create 'fruit','info'
- 在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
- 执行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
- 使用scan命令查看导入后的结果
hbase(main):001:0> scan ‘fruit’
2. 自定义实现
1. 读HDFS写到HBase
目标
:实现将HDFS中的数据写入到HBase表中。
HDFS2HBaseMapper
package com.atguigu.mr2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import java.io.IOException;
public class HDFS2HBaseMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据并切分
String[] fields = value.toString().split("\t");
//创建Put对象
Put put = new Put(Bytes.toBytes(fields[0]));
//给Put对象赋值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(fields[2]));
//写出
context.write(NullWritable.get(), put);
}
}
HDFS2HBaseReducer
package com.atguigu.mr2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class HDFS2HBaseReducer extends TableReducer<NullWritable, Put, NullWritable> {
@Override
protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//遍历写出
for (Put value : values) {
context.write(key, value);
}
}
}
HDFS2HBaseDriver
package com.atguigu.mr2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HDFS2HBaseDriver extends Configuration implements Tool {
//声明配置信息
private Configuration configuration;
@Override
public int run(String[] args) throws Exception {
//1.创建Job对象
Job job = Job.getInstance(configuration);
//2.设置主类
job.setJarByClass(HDFS2HBaseDriver.class);
//3.设置Mapper
job.setMapperClass(HDFS2HBaseMapper.class);
//4.设置Mapper的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
//5.设置Reducer
TableMapReduceUtil.initTableReducerJob(args[1], HDFS2HBaseReducer.class, job);
//6.设置输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//7.提交任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
configuration = conf;
}
@Override
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) {
//创建配置信息
Configuration configuration = new Configuration();
//运行
try {
int run = ToolRunner.run(configuration, new HDFS2HBaseDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. HBase to HBase
目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。
package com.atguigu.mr1;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
// 读取HBase 数据
public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//创建Put对象
Put put = new Put(key.get());
//遍历value(一行数据)
for (Cell cell : value.rawCells()) {
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//给put设置值
put.add(cell);
}
}
//写出
context.write(key, put);
}
}
---
package com.atguigu.mr1;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//遍历写出
for (Put value : values) {
context.write(key, value);
}
}
}
---
package com.atguigu.mr1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FruitDriver extends Configuration implements Tool {
//声明配置信息
Configuration configuration;
@Override
public int run(String[] args) throws Exception {
//1.创建Job对象
Job job = Job.getInstance(configuration);
//2.设置主类
job.setJarByClass(FruitDriver.class);
//3.设置Mapper类
TableMapReduceUtil.initTableMapperJob("sowhat",
new Scan(),
FruitMapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
//4.设置Reducer类
TableMapReduceUtil.initTableReducerJob("sowhat1412",
FruitReducer.class,
job);
//5.提交
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
configuration = conf;
}
@Override
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) {
//创建配置信息
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "host-10-100-34-111,host-10-100-34-120,host-10-100-34-140");
try {
int result = ToolRunner.run(configuration, new FruitDriver(), args);
System.exit(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
PS
: 在集群上操作时候我们把HBase若干jar放到了Hadoop环境变量中,而本地开发的时候可不干,是因为Hbase的依赖中包含了Hadoop的核心代码。
并且本地开发的话,一般要把Hbase集群中的hbase-site.xml
文件加载到IDEA的resource中,来帮助本地代码找到ZK集群。当然你也可以选择把ZK的配置写入到配置文件中。如上。windows运行遇到问题 依赖
Hive操作HBase
我们可以把Hive跟HBase进行关联起来,然后Hive中的数据不再由HDFS存储而是存储到HBase中,并且,关联后Hive中添加数据在HBase中可看到,HBase中添加数据Hive也可看到。
Hive
- 数据仓库
Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。 - 用于数据分析、清洗
Hive适用于离线的数据分析和清洗,延迟较高。 - 基于HDFS、MapReduce
Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。 - 分析框架,元数据信息给MySQL
HBase
- 数据库
是一种面向列族存储的非关系型数据库。 - 用于存储结构化和非结构化的数据(其实更多是结构化,MongoDB来存非结构化多一些)
适用于单表非关系型数据的存储,不适合做关联查询 sum ,avg等
,类似JOIN等操作。 - 基于HDFS
数据持久化存储的体现形式是Hfile,存放于DataNode中,被ResionServer以region的形式进行管理。 - 延迟较低,接入在线业务使用
面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度
。 - HBase 存储框架 自己保持元数据信息。
- 数据量不够大其实用 Redis,ES跟更好点。
HBase与Hive集成使用
尖叫提示
:HBase与Hive的集成在最新的两个版本中无法兼容。所以,我们只能含着泪勇敢的重新编译:hive-hbase-handler-1.2.2.jar,兼容后相当于Hive的数据存储在HBase中了。兼容配置百度即可。
环境准备
因为我们后续可能会在操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的Jar包(或者使用软连接的形式)。
export HBASE_HOME=/opt/module/hbase
export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
同时在hive-site.xml中修改zookeeper的属性,如下:
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
1. 案例一
目标
:建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。
分步实现: 根据位置关系进行映射,有点Hive跟neo4j的映射。
- 在Hive中创建表同时关联HBase
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
提示
:完成之后,可以分别进入Hive和HBase查看,都生成了对应的表
2. 在Hive中创建临时中间表,用于load文件中的数据
提示
:不能
将数据直接load进Hive所关联HBase的那张表中
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
- 向Hive中间表中load数据,结果是失败的!
hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
- 通过insert命令将中间表中的数据导入到Hive关联HBase的那张表中,成功后数据就导入到HBase中了,记得Hbase的flush操作。
hive> insert into table hive_hbase_emp_table select * from emp;
- 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据
hive> select * from hive_hbase_emp_table;
hbase> scan ‘hbase_emp_table’
2. 更常用案例二
目标
:在HBase中已经
存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。
注:该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1。
- 在Hive中创建
外部表
CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
- 关联后就可以使用Hive函数进行一些分析操作了
hive (default)> select * from relevance_hbase_emp;