Hbase总结
概述
hbase是bigtable的开源java版本。是建立在hdfs之上,提供高可靠性、高性能、列存储、可伸缩、实时读写nosql的数据库系统。它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作)。
Hbase特点
- 主要用来存储结构化和半结构化的松散数据。
- Hbase查询数据功能很简单,不支持join等复杂操作,不支持复杂的事务(行级的事务)
- Hbase中支持的数据类型:byte[]
4、与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。
5、基于列存储的分布式数据库
6、没有真正的索引,行顺序存储,也没有所谓的索引膨胀问题。
7、自动分区,表增长时,自动分区到新的节点上。
8、线性扩展和区域会自动重新平衡,运行RegionServer,达到负载均衡的目的。
9、容错和普通商用的硬件支持。这点同hadoop类似。
行的特点
HBase中的表一般有这样的特点:
1、大:一个表可以有上十亿行,上百万列
2、面向列:面向列(族)的存储和权限控制,列(族)独立检索。
3、稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
表结构逻辑图
HBase以表的形式存储数据。表有行和列组成。列划分为若干个列族(column family)
Row Key 的设计
row key的设计原则应当遵循以下几点
(1)rowkey唯一原则
必须在设计上保证其唯一性,rowkey是按照二进制字节数组排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。所以设计rwo key时尽量把体现业务特征的信息、业务上有唯一性的信息编进row key。
(2)rowkey长度原则
rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100byte,以byte[] 形式保存,一般设计成定长。建议越短越好,不要超过16个字节,2个原因——
原因1:
数据的持久化文件HFile中是按照(Key,Value)存储的,如果rowkey过长,例如超过100byte,那么1000万行的记录计算,仅row key就需占用100*1000万=10亿byte,近1Gb。这样会极大影响HFile的存储效率!
原因2:
MemStore将缓存部分数据到内存,若 rowkey字段过长,内存的有效利用率就会降低,就不能缓存更多的数据,从而降低检索效率。
目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
(3)rowkey散列原则
如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
hbase的预分区设计以及热点问题
知道hbase数据表的key的分布情况,就可以在建表的时候对hbase进行region的预分区。这样做的好处是防止大数据量插入的热点问题,提高数据插入的效率,在读数据时热点能分不到不同的机器上,提高性能
我测试的RowKey是:编号+名字+代号
hbase(main):001:0> create 't2', {NAME => 'f', VERSIONS => 1, COMPRESSION => 'SNAPPY'},{SPLITS => ['05', '10', '15', '20','25','30','35','40','45','50','55','60','65','70','75','80','85','90','95']}
后面会跟着一个"|",是因为在ASCII码中,"|"的值是124,大于所有的数字和字母等符号
插入数据,NAME => 'f'越短越好,占用空间小
hbase(main):002:0> put 't2','05|xiaoli|02','f:01','xiaowang'
查看数据
hbase(main):003:0> scan 't2'
ROW COLUMN+CELL
05|xiaoli|02 column=f:01, timestamp=1510289689857, value=xiaowang
05|xiaozhang|01 column=f:, timestamp=1510285968252, value=xiaozhang
24|xiaozhang|01 column=f:, timestamp=1510286017448, value=xiaozhang
25|xiaozhang|01 column=f:, timestamp=1510285975659, value=xiaozhang
4 row(s) in 0.4910 seconds
查看分区情况
查看分布情况
列族
hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。
列名都以列族作为前缀。例如courses:history , courses:math 都属于 courses 这个列族。
访问控制、磁盘和内存的使用统计都是在列族层面进行的。
列族越多,在取一行数据时所要参与IO、搜寻的文件就越多,所以,如果没有必要,不要设置太多的列族
总结设计列族:
1、一般不建议设计多个列族
2、数据块的缓存的设计
3、激进缓存设计
4、布隆过滤器的设计(可以提高随机读取的速度)
5、生产日期的设计
6、列族压缩
7、单元时间版本
列的设计详见:http://blog.csdn.net/shenfuli/article/details/50589437
布隆过滤器:http://blog.csdn.net/opensure/article/details/46453681
时间戳
HBase中通过row和columns确定的为一个存贮单元称为cell。每个 cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。
为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式:
1、保存数据的最后n个版本
2、保存最近一段时间内的版本(设置数据的生命周期TTL)。
用户可以针对每个列族进行设置。
Hbase集群搭建
软件下载:链接:http://pan.baidu.com/s/1kU7YuVl 密码:t9z1 如果无法下载请联系作者。
1-1)、环境搭建
[root@hadoop1 local]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.8/bin/../conf/zoo.cfg
Mode: follower
[root@hadoop2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.8/bin/../conf/zoo.cfg
Mode: follower
[root@hadoop3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.8/bin/../conf/zoo.cfg
Mode: leader
[root@hadoop1 local]# tar -zxvf hbase-1.2.1-bin.tar.gz
[root@hadoop1 local]# mv hbase-1.2.1 hbase
[root@hadoop1 hbase]# cd conf/
[root@hadoop1 conf]# cat hbase-env.sh
# The java implementation to use. Java 1.7+ required.
export JAVA_HOME=/home/jdk1.7
# Tell HBase whether it should manage it's own instance of Zookeeper or not.
export HBASE_MANAGES_ZK=true
修改为
export HBASE_MANAGES_ZK=false
如果内存大的话建议添加以下配置项:
添加堆内存(16G)
export HBASE_HEAPSIZE=16384
添加配置参数
export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -XX:ErrorFile=/var/log/hbase/hs_err_pid%p.log -Djava.io.tmpdir=/tmp"
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -Xmx32512m $JDK_DEPENDED_OPTS"
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -Xmn1632m -XX:CMSInitiatingOccupancyFraction=70 -Xms8192m -Xmx8192m $JDK_DEPENDED_OPTS"
[root@hadoop1 conf]# vi hbase-site.xml
<configuration>
<property>
<!-- hbase在hdfs上的储存路径 -->
<name>hbase.rootdir</name>
<value>hdfs://hadoop1:9000/hbase</value>
</property>
<property>
<!-- 指定hbase是分布式 -->
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<!-- 指定zookeeper的地址 -->
<name>hbase.zookeeper.quorum</name>
<value>hadoop1:2181,hadoop2:2181,hadoop3:2181</value>
</property>
</configuration>
[root@hadoop1 conf]# vi regionservers
hadoop1
hadoop2
hadoop3
修改back-master指定备用的节点
[root@hadoop1 conf]# vi backup-masters
Hadoop2
[root@hadoop1 bin]# vi /etc/profile
加入以下配置:
export HBASE_HOME=/usr/local/hbase
export PATH=$PATH:$HBASE_HOME/bin
[root@hadoop1 bin]# source /etc/profile
1-2)、拷贝hbase到其他节点
[root@hadoop1 local]# scp -r hbase/ hadoop2:/usr/local/
[root@hadoop1 local]# scp -r hbase/ hadoop3:/usr/local/
1-3)、同步时间
[root@hadoop2 ~]# date -s "2016-10-01 12:14:30"
Sat Oct 1 12:14:30 PDT 2016
[root@hadoop2 ~]# clock -w
1-4)、启动所有的Hbase进程
先启动zookeeper 与hdfs
[root@hadoop1 conf]# start-hbase.sh
starting master, logging to /usr/local/hbase/logs/hbase-root-master-hadoop1.out
hadoop2: starting regionserver, logging to /usr/local/hbase/bin/../logs/hbase-root-regionserver-hadoop2.out
hadoop3: starting regionserver, logging to /usr/local/hbase/bin/../logs/hbase-root-regionserver-hadoop3.out
hadoop1: starting regionserver, logging to /usr/local/hbase/bin/../logs/hbase-root-regionserver-hadoop1.out
hadoop2: starting master, logging to /usr/local/hbase/bin/../logs/hbase-root-master-hadoop2.out
注意启动的顺序以及启动的进程
1-5)、查看进程
[root@hadoop1 conf]# jps
3645 NodeManager
3101 NameNode
4935 HMaster
3194 DataNode
3374 SecondaryNameNode
5135 Jps
3550 ResourceManager
5058 HRegionServer
2906 QuorumPeerMain
[root@hadoop2 ~]# jps
3437 HRegionServer
3794 Jps
3509 HMaster
2797 DataNode
2906 NodeManager
2707 QuorumPeerMain
[root@hadoop3 ~]# jps
3675 Jps
2952 DataNode
3497 HRegionServer
2861 QuorumPeerMain
3062 NodeManager
1-6)、查看信息
Hadoop1的信息
Hadoop2的信息
建议多看一下http://hadoop1:16010/master-status上面的信息,在上面web界面中可以看出
- 在master创建时系统创建了两个表,分别是hbase:meta和hbase:namespace
- 可以看出当前的备用是那台机器
- 内存的使用情况
有兴趣的可以查看一下配置信息:
http://hadoop1:16030/conf
Zookeeper的信息
[root@hadoop1 local]# hadoop fs -ls /hbase
Found 7 items
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/.tmp
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/MasterProcWALs
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/WALs
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/data
-rw-r--r-- 3 root supergroup 42 2016-09-30 23:51 /hbase/hbase.id
-rw-r--r-- 3 root supergroup 7 2016-09-30 23:51 /hbase/hbase.version
drwxr-xr-x - root supergroup 0 2016-10-01 00:02 /hbase/oldWALs
[root@hadoop1 local]# hadoop fs -ls /hbase/WALs
Found 3 items
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/WALs/hadoop1,16020,1475304647153
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/WALs/hadoop2,16020,1475304645888
drwxr-xr-x - root supergroup 0 2016-09-30 23:51 /hbase/WALs/hadoop3,16020,1475304638450
[root@hadoop1 local]# hadoop fs -ls /hbase/oldWALs
WAL是保存regionServer的日志,日志分为旧日志与新日志。
[root@hadoop1 local]# hadoop fs -ls /hbase/WALs
Found 3 items
drwxr-xr-x - root supergroup 0 2017-02-03 18:40 /hbase/WALs/hadoop1,16020,1486176018126
drwxr-xr-x - root supergroup 0 2017-02-03 18:40 /hbase/WALs/hadoop2,16020,1486176023458
drwxr-xr-x - root supergroup 0 2017-02-03 18:40 /hbase/WALs/hadoop3,16020,1486176007473
[root@hadoop1 local]# hadoop fs -cat /hbase/WALs/hadoop1,16020,1486176018126
[root@hadoop1 local]# hadoop fs -cat /hbase/WALs/hadoop1,16020,1486176018126/hadoop1%2C16020%2C1486176018126..meta.1486176042878.meta
PWAL"ProtobufLogWriter*5org.apache.hadoop.hbase.regionserver.wal.WALCellCodec#
1588230740_x0012_
hbase:meta 瞫8€*N
METAFAMILYHBASE::REGION_EVENTZþƺ_x0012_
hbase:meta
1588230740 *
********************
[root@hadoop1 local]# hadoop fs -cat /hbase/WALs/hadoop1,16020,1486176018126/hadoop1%2C16020%2C1486176018126.default.1486176038828
PWAL"ProtobufLogWriter*5org.apache.hadoop.hbase.regionserver.wal.WALCellCodec>
1a2f9300d38edb133fed95603add617d_x0012_hbase:namespace ¥™̪œ
METAFAMILYHBASE::REGION_EVENTZþ̥_x0012_hbase:namespace 1a2f9300d38edb133fed95603add617d *
******************************
可以看出保存了很多的元数据的信息
1-7)、其他启动HMaster的方式
[root@hadoop1 ~]# ./hbase-daemon.sh start master
基本的SHELL命令(ruby语言)
详细的可以查看:http://hbase.apache.org/book.html
[root@hadoop1 bin]# ./hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.4/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.1, r8d8a7107dc4ccbf36a92f64675dc60392f85c015, Wed Mar 30 11:19:21 CDT 2016
hbase(main):001:0> l
查看帮助信息
Hbase命令汇总:https://learnhbase.wordpress.com/2013/03/02/hbase-shell-commands/comment-page-1/#comment-367
hbase(main):008:0> create help
HBase Shell, version 1.2.1, r8d8a7107dc4ccbf36a92f64675dc60392f85c015, Wed Mar 30 11:19:21 CDT 2016
Type 'help "COMMAND"', (e.g. 'help "get"' -- the quotes are necessary) for help on a specific command.
Commands are grouped. Type 'help "COMMAND_GROUP"', (e.g. 'help "general"') for help on a command group.
COMMAND GROUPS:
Group name: general
Commands: status, table_help, version, whoami
Group name: ddl
Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, locate_region, show_filters
Group name: namespace
Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
Group name: dml
Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve
Group name: tools
Commands: assign, balance_switch, balancer, balancer_enabled, catalogjanitor_enabled, catalogjanitor_run, catalogjanitor_switch, close_region, compact, compact_rs, flush, major_compact, merge_region, move, normalize, normalizer_enabled, normalizer_switch, split, trace, unassign, wal_roll, zk_dump
Group name: replication
Commands: add_peer, append_peer_tableCFs, disable_peer, disable_table_replication, enable_peer, enable_table_replication, list_peers, list_replicated_tables, remove_peer, remove_peer_tableCFs, set_peer_tableCFs, show_peer_tableCFs
Group name: snapshots
Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, list_snapshots, restore_snapshot, snapshot
Group name: configuration
Commands: update_all_config, update_config
Group name: quotas
Commands: list_quotas, set_quota
Group name: security
Commands: grant, list_security_capabilities, revoke, user_permission
Group name: procedures
Commands: abort_procedure, list_procedures
Group name: visibility labels
Commands: add_labels, clear_auths, get_auths, list_labels, set_auths, set_visibility
SHELL USAGE:
Quote all names in HBase Shell such as table and column names. Commas delimit
command parameters. Type <RETURN> after entering a command to run it.
Dictionaries of configuration used in the creation and alteration of tables are
Ruby Hashes. They look like this:
{'key1' => 'value1', 'key2' => 'value2', ...}
and are opened and closed with curley-braces. Key/values are delimited by the
'=>' character combination. Usually keys are predefined constants such as
NAME, VERSIONS, COMPRESSION, etc. Constants do not need to be quoted. Type
'Object.constants' to see a (messy) list of all constants in the environment.
If you are using binary keys or values and need to enter them in the shell, use
double-quote'd hexadecimal representation. For example:
hbase> get 't1', "key\x03\x3f\xcd"
hbase> get 't1', "key\003\023\011"
hbase> put 't1', "test\xef\xff", 'f1:', "\x01\x33\x40"
000
The HBase shell is the (J)Ruby IRB with the above HBase-specific commands added.
For more on the HBase Shell, see http://hbase.apache.org/book.html
ERROR: Table name must be of type String
Here is some help for this command:
Creates a table. Pass a table name, and a set of column family
specifications (at least one), and, optionally, table configuration.
Column specification can be a simple string (name), or a dictionary
(dictionaries are described below in main help output), necessarily
including NAME attribute.
Examples:
Create a table with namespace=ns1 and table qualifier=t1
hbase> create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
Create a table with namespace=default and table qualifier=t1
hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
hbase> # The above in shorthand would be the following:
hbase> create 't1', 'f1', 'f2', 'f3'
hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}
Table configuration options can be put at the end.
Examples:
hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'
hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }
hbase> # Optionally pre-split the table into NUMREGIONS, using
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}
You can also keep around a reference to the created table:
hbase> t1 = create 't1', 'f1'
Which gives you a reference to the table named 't1', on which you can then
call methods.
仔细查看一下上边的提示,对于用ruby语言操作表还是比较有好处的、、、、
按照官网上给的实例试一下:
http://hbase.apache.org/book.html
A)、简单语句操作
hbase(main):001:0> create 't_user_info',{NAME => 'base_info'}, {NAME => 'extra_info'}
0 row(s) in 1.7730 seconds
=> Hbase::Table - t_user_info
详细信息请查看:
http://blog.csdn.net/xfg0218/article/details/54861973
hbase(main):012:0> create 't_user_info',{NAME => 'base_info'}, {NAME => 'extra_info'}
0 row(s) in 1.2480 seconds
=> Hbase::Table - t_user_info
hbase(main):013:0> put 't_user_info','zhang-001','base_info:name','xiaozhang'
0 row(s) in 0.0190 seconds
hbase(main):014:0> put 't_user_info','zhang-001','base_info:age','18'
0 row(s) in 0.0090 seconds
hbase(main):015:0> put 't_user_info','zhang-002','base_info:name','xiaowang'
0 row(s) in 0.0180 seconds
hbase(main):016:0> put 't_user_info','zhang-002','base_info:age','20'
0 row(s) in 0.0110 seconds
hbase(main):017:0> scan 't_user_info'
ROW COLUMN+CELL
zhang-001 column=base_info:age, timestamp=1486179529235, value=18
zhang-001 column=base_info:name, timestamp=1486179516656, value=xiaozhang
zhang-002 column=base_info:age, timestamp=1486179562265, value=20
zhang-002 column=base_info:name, timestamp=1486179550995, value=xiaowang
2 row(s) in 0.0330 seconds
hbase(main):018:0> get 't_user_info','zhang-001'
COLUMN CELL
base_info:age timestamp=1486179529235, value=18
base_info:name timestamp=1486179516656, value=xiaozhang
2 row(s) in 0.0430 seconds
hbase(main):019:0> enable 't_user_info'
0 row(s) in 0.0290 seconds
hbase(main):020:0> disable 't_user_info'
0 row(s) in 2.3140 seconds
hbase(main):021:0> drop 't_user_info'
0 row(s) in 1.2650 seconds
hbase(main):003:0> scan 't_user_info',{LIMIT=>5}
1-9)、按照个数查询数据的总个数(现在是按照1000000统计一次)
A)、使用命令查看
hbase(main):004:0> count 't_user_info',INTERVAL=>1000000
B)、使用hadoop的命令查看
此操作需要正确的安装hadoop
# hbase org.apache.hadoop.hbase.mapreduce.RowCounter t_user_info
**************
HBase Counters
BYTES_IN_REMOTE_RESULTS=0
BYTES_IN_RESULTS=427780
MILLIS_BETWEEN_NEXTS=1617
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=1
REMOTE_RPC_CALLS=0
REMOTE_RPC_RETRIES=0
RPC_CALLS=103
RPC_RETRIES=0
org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
ROWS=10000
**************************
hbase(main):007:0> truncate 't_user_info'
Truncating 'xiaoxu' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 9.7300 seconds
1-11)、按照过滤条件查找数据
hbase(main):003:0> scan 'test_hbase', {LIMIT => 4, STARTROW => 'row1'}
ROW COLUMN+CELL
row1 column=cf:a, timestamp=1519872514241, value=value1
row10 column=cf:a, timestamp=1519872514342, value=value10
row100 column=cf:a, timestamp=1519872515388, value=value100
row1000 column=cf:a, timestamp=1519872521158, value=value1000
LIMIT:数据的个数
STARTROW:STARTROW 的匹配规则
B)、复杂语句操作
hbase(main):007:0> create 'user', 'user1, 'user2’
0 row(s) in 1.3200 seconds
=> Hbase::Table - user
hbase(main):008:0> put 'user','row1','user1:name','xiaowang'
0 row(s) in 0.1500 seconds
hbase(main):009:0> put 'user','row1','user1:age','18'
0 row(s) in 0.0270 seconds
hbase(main):010:0> put 'user','row2','user1:name','xiaoli'
0 row(s) in 0.0140 seconds
hbase(main):011:0> put 'user','row2','user1:age','20'
0 row(s) in 0.0120 seconds
hbase(main):025:0> get 'user','row1'
COLUMN CELL
info1:age timestamp=1475312493873, value=20
info1:name timestamp=1475312458605, value=xiaozhang
info1:sex timestamp=1475312518279, value=\xE7\x94\xB7
3 row(s) in 0.0320 seconds
hbase(main):026:0> put 'user','row1','data1:name','xiaozhang'
0 row(s) in 0.0110 seconds
hbase(main):027:0> put 'user','row1','data1:sex','nan'
0 row(s) in 0.0090 seconds
hbase(main):028:0> put 'user','row1','data1:age','21'
0 row(s) in 0.0120 seconds
hbase(main):030:0> get 'user','row1'
COLUMN CELL
data1:age timestamp=1475312711967, value=21
data1:name timestamp=1475312676383, value=xiaozhang
data1:sex timestamp=1475312700959, value=nan
info1:age timestamp=1475312493873, value=20
info1:name timestamp=1475312458605, value=xiaozhang
info1:sex timestamp=1475312518279, value=\xE7\x94\xB7
6 row(s) in 0.0760 seconds
hbase(main):031:0> scan 'user'
ROW COLUMN+CELL
hb1 column=data1:name, timestamp=1475312198163, value=zhangsan
hb1 column=info1:name, timestamp=1475312152590, value=zhangsan
hb2 column=data1:name, timestamp=1475312218259, value=lisi
hb2 column=info1:name, timestamp=1475312171289, value=lisi
row1 column=data1:age, timestamp=1475312711967, value=21
row1 column=data1:name, timestamp=1475312676383, value=xiaozhang
row1 column=data1:sex, timestamp=1475312700959, value=nan
row1 column=info1:age, timestamp=1475312493873, value=20
row1 column=info1:name, timestamp=1475312458605, value=xiaozhang
row1 column=info1:sex, timestamp=1475312518279, value=\xE7\x94\xB7
hbase(main):032:0> delete 'user','row1','info1:name'
注意以上的删除的信息,是删除最后面的一条
hbase(main):040:0> truncate 'user'
Truncating 'user' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 4.2500 seconds
hbase(main):041:0> scan 'user'
ROW COLUMN+CELL
0 row(s) in 0.3570 seconds
hbase(main):042:0> disable 'user'
0 row(s) in 2.2700 seconds
hbase(main):043:0> alter 'user',NAME=>'f2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 5.3170 seconds
hbase(main):044:0> enable 'user'
0 row(s) in 1.2890 seconds
hbase(main):049:0> disable 'hb'
0 row(s) in 4.3040 seconds
hbase(main):050:0> drop 'hb'
0 row(s) in 1.2890 seconds
C)、查看zookeeper保存的表的信息
D)、查看HDFS上的Hbase保存的数据
[root@hadoop2 /]# hadoop fs -cat /hbase/data/default/user/82870eae37e88ea6b9b9c1a7ac56ee42/user1/cd38577990ab42daae7cbff4b5a9468f
DATABLK*'#ÿÿÿÿÿÿÿÿ@Drow1user1ageZ¢!21褂LMFBLK2ÿÿÿÿÿÿÿÿ@#D•B鈄XROOT2)%ÿÿÿÿÿÿÿÿ@FHrow1user1ageZ¢!綍®IDXROOT2o@!nC¦ㅉLEINF2¡ÿÿÿÿÿÿÿÿ@¾PBUF—
BLOOM_FILTER_TYPE_x0012_ROW
DELETE_FAMILY_COUNT_x0012_
ARLIEST_PUT_TSZ¢!
*****************************
E)、把Hbase的信息导出到HDFS中
[root@hadoop1 hadoop]#hbase org.apache.hadoop.hbase.mapreduce.Export xiaoxu /tmp/xiaoxu
F)、查看集群的状态
hbase(main):001:0> status
1 active master, 1 backup masters, 14 servers, 0 dead, 9.2143 average load
hbase> status
hbase> status 'simple'
hbase> status 'summary'
hbase> status 'detailed'
详细的信息查看:http://blog.csdn.net/xfg0218/article/details/78674316
G)、查看当前Hbase的版本
hbase(main):003:0> version
1.1.2.2.6.0.3-8, r3307790b5a22cf93100cad0951760718dee5dec7, Sat Apr 1 21:41:47 UTC 2017
H)、查看当前登录的用户信息
hbase(main):004:0> whoami
admin/[email protected] (auth:KERBEROS)
groups: admin
I)、disable相匹配的表
hbase(main):006:0> disable_all 'test*'
J)、删除所有的表
hbase(main):006:0> drop_all 'test*'
K)、开启所有的表
hbase(main):006:0> enable_all 'test*'
L)、常用Hbase命令总结
常用Hbase命令请查看
http://blog.csdn.net/xfg0218/article/details/78655990
M)、修改表的名字
hbase shell> disable 'tableName'
hbase shell> snapshot 'tableName', 'tableSnapshot'
hbase shell> clone_snapshot 'tableSnapshot', 'newTableName'
hbase shell> delete_snapshot 'tableSnapshot'
hbase shell> drop 'tableName'
Hbase 插入大量数据脚本
1-1)、手动创建Hbase表
hbase(main):006:0> create 't1','info','date'
0 row(s) in 2.8610 seconds
=> Hbase::Table - t1
1-2)、编写Hbase的put数据的脚本
# vi hbase-put.sh
#!/bin/sh
for i in {1..1000000}
do
hbase shell << EOF
put 't1','$i','info:name','zhangsan$1'
EOF
done
1-3)、编写hbase的scan脚本
# vi hbase-scan.sh
#!/bin/sh
while [ true ]; do
hbase shell <<EOF
scan 't1'
EOF
done
HBase集群数据迁移方案
1-1)、静态迁移方案
- 、在hbase停止的状态下进行数据的迁移。
B)、采用Hadoop distcp方式,将以上目录的内容,迁移到另一个集群。
将hbase存储在hdfs上面的数据目录全部从当前集群拷贝至目标集群hbase对应的hdfs目录
# hadoop distcp -f /hbase/data/ hdfs://192.168.199.131:9000/hbase/data/
-f : 需要复制的文件列表
hadoop distcp 参数详解:http://blog.csdn.net/xfg0218/article/details/78517542
或官方介绍参考:http://hadoop.apache.org/docs/r1.0.4/cn/distcp.html
# hbase hbck -fixAssignments -fixMeta
缺点:不太灵活
1-2)、动态迁移方案
修改hbase-site.xml配置,增加hbase.replication属性,
增加表属性REPLICATION_SCOPE属性。
add_peer增加一个从集群。
1-1)、执行命令
#hbase org.apache.hadoop.hbase.mapreduce.CopyTable --peer.adr=192.168.199.131:2181:/hbase_table
此命令是把所有的表都备份出来,并且Zookeeper的znode是hbase_table
1-2)、优缺点
1、拷贝完成,不需要重启机器,在new cluster中就可以看到该表。
2、稳定性不是很好,待考虑
1-1)、执行命令
在旧的集群上执行
#hbase org.apache.hadoop.hbase.mapreduce.Export table1 hdfs://192.168.199.131:9000/hbase_table
在新的集群中执行
#hbase org.apache.hadoop.hbase.mapreduce.Import table1 hdfs://192.168.199.131:9000/hbase_table
1-2)、优缺点
1、一定要写绝对路径,不能写相对路径。
2、在import前,需要将表事先在new cluster中创建好。
1-3)、手动方式
# hadoop fs -get /apps/hbase/ /home/hbase_back
# scp -r /home/hbase_back [email protected]:/home/hbase_back
# hadoop fs -put /home/hbase_back /apps/hbase/
# hbase hbck -fixMeta
查看该表的meta数据:
hbase(main):001:0> scan 'hbase:meta'
E)、重新分配数据到各RegionServer
# hbase hbck -fixAssignments
因为是执行命令所以比较灵活,安全,没有兼容的问题
HBase Snapshot进行快照备份
HBase以往数据的备份基于distcp或者copyTable等工具,这些备份机制或多或少对当前的online数据读写存在一定的影响,Snapshot提供了一种快速的数据备份方式,无需进行数据copy。
详细介绍请查看:http://blog.csdn.net/xfg0218/article/details/78518417
官网介绍:http://hbase.apache.org/0.94/book/ops.snapshots.html
1-1)、修改配置
在0.95之后默认开启snapshot功能,之前版本的需要手动开启对snapshot的支持,修改hbas-site.xml文件添加
<property>
<name>hbase.snapshot.enabled</name>
<value>true</value>
</property>
1-2)、好看数据
hbase(main):002:0> scan 'hbase_test'
ROW COLUMN+CELL
1234 column=age:18, timestamp=1508137454097, value=zhangsan
12345 column=age:18, timestamp=1508137513581, value=zhangsan
123456 column=age:19, timestamp=1508137616816, value=lisi
test column=age:20, timestamp=1508139661503, value=wangwu
4 row(s) in 0.1290 seconds
1-3)、snapshot 操作
hbase(main):004:0> snapshot 'hbase_test','hbase_snapshot'
0 row(s) in 0.3720 seconds
1-4)、列出当前所有得快照
hbase(main):005:0> list_snapshots
SNAPSHOT TABLE + CREATION TIME
hbase_snapshot hbase_test (Mon Nov 13 12:59:20 +0800 2017)
1 row(s) in 0.0260 seconds
=> ["hbase_snapshot"]
1-5)、基于快照,clone一个新表
hbase(main):006:0> clone_snapshot 'hbase_snapshot','new_hbase_snapshot'
0 row(s) in 0.3910 seconds
1-6)、删除快照信息
hbase(main):010:0> delete_snapshot 'hbase_snapshot'
0 row(s) in 0.0200 seconds
1-7)、基于快照恢复表
hbase(main):014:0> disable 'hbase_test'
0 row(s) in 2.2580 seconds
hbase(main):015:0> restore_snapshot 'hbase_snapshot'
0 row(s) in 0.2780 seconds
1-8)、快照复制到其他的集群中
#hbase class org.apache.hadoop.hbase.snapshot.tool.ExportSnapshot -snapshotMySnapshot -copy-to hdfs:///192.168.199.131:8082/hbase -mappers 16
Hbase代码开发
Jar 包可以去hbase的安装目录下的lib目录查找。
1-1)、基本增删改查java实现
package HbaseProject;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws Exception {
conf = HBaseConfiguration.create();
// 对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群地址就可以了
// 因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum",
"hadoop1:2181,hadoop2:2181,hadoop3:2181");
conn = ConnectionFactory.createConnection(conf);
}
/**
* 建表
*
* @throws Exception
*/
@Test
public void testCreate() throws Exception {
// 获取一个表管理器
Admin admin = conn.getAdmin();
// 构造一个表描述器,并指定表名
HTableDescriptor htd = new HTableDescriptor(
TableName.valueOf("java_test"));
// 构造一个列族描述器,并指定列族名
HColumnDescriptor hcd1 = new HColumnDescriptor("base_info");
// 为该列族设定一个布隆过滤器类型参数/版本数量
hcd1.setBloomFilterType(BloomType.ROW).setVersions(1, 3);
// 构造第二个列族描述器,并指定列族名
HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info");
hcd2.setBloomFilterType(BloomType.ROW).setVersions(1, 3);
// 将列族描述器添加到表描述器中
htd.addFamily(hcd1).addFamily(hcd2);
admin.createTable(htd);
admin.close();
conn.close();
}
/**
* 删除表
*
* @throws Exception
*/
@Test
public void testDrop() throws Exception {
Admin admin = conn.getAdmin();
admin.disableTable(TableName.valueOf("t_user_info"));
admin.deleteTable(TableName.valueOf("t_user_info"));
admin.close();
conn.close();
}
/**
* 修改表定义(schema)
*
* @throws Exception
*/
@Test
public void testModify() throws Exception {
Admin admin = conn.getAdmin();
// 修改已有的ColumnFamily
HTableDescriptor table = admin.getTableDescriptor(TableName
.valueOf("t_user_info"));
HColumnDescriptor f2 = table.getFamily("extra_info".getBytes());
f2.setBloomFilterType(BloomType.ROWCOL);
// 添加新的ColumnFamily
table.addFamily(new HColumnDescriptor("other_info"));
admin.modifyTable(TableName.valueOf("t_user_info"), table);
admin.close();
conn.close();
}
/**
* 插入/修改 数据 DML
*
* @throws Exception
*/
@Test
public void testPut() throws Exception {
Table table = conn.getTable(TableName.valueOf("t_user_info"));
ArrayList<Put> puts = new ArrayList<Put>();
// 构建一个put对象(kv),指定其行键
Put put01 = new Put(Bytes.toBytes("user001"));
put01.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("zhangsan"));
Put put02 = new Put("user001".getBytes());
put02.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("password"),
Bytes.toBytes("123456"));
Put put03 = new Put("user002".getBytes());
put03.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("lisi"));
put03.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
Put put04 = new Put("zhang_sh_01".getBytes());
put04.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("zhang01"));
put04.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
Put put05 = new Put("zhang_sh_02".getBytes());
put05.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("zhang02"));
put05.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
Put put06 = new Put("liu_sh_01".getBytes());
put06.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("liu01"));
put06.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
Put put07 = new Put("zhang_bj_01".getBytes());
put07.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("zhang03"));
put07.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
Put put08 = new Put("zhang_bj_01".getBytes());
put08.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"),
Bytes.toBytes("zhang04"));
put08.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"),
Bytes.toBytes("false"));
puts.add(put01);
puts.add(put02);
puts.add(put03);
puts.add(put04);
puts.add(put05);
puts.add(put06);
puts.add(put07);
puts.add(put08);
table.put(puts);
table.close();
conn.close();
}
// 读取数据 ---get:一次读一行
@Test
public void testGet() throws Exception {
Table table = conn.getTable(TableName.valueOf("t_user_info"));
// 构造一个get查询参数对象,指定要get的是哪一行
Get get = new Get("user001".getBytes());
Result result = table.get(get);
CellScanner cellScanner = result.cellScanner();
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] valueArray = current.getValueArray();
System.out.print(new String(familyArray, current.getFamilyOffset(),
current.getFamilyLength()));
System.out.print(":"
+ new String(qualifierArray, current.getQualifierOffset(),
current.getQualifierLength()));
System.out.println(" "
+ new String(valueArray, current.getValueOffset(), current
.getValueLength()));
}
table.close();
conn.close();
}
/**
* 删除表中的列数据
*
* @throws Exception
*/
@Test
public void testDel() throws Exception {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Delete delete = new Delete("user001".getBytes());
delete.addColumn("base_info".getBytes(), "password".getBytes());
t_user_info.delete(delete);
t_user_info.close();
conn.close();
}
/**
* scan 批量查询数据
*
* @throws Exception
*/
@Test
public void testScan() throws Exception {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Scan scan = new Scan(Bytes.toBytes("liu_sh_01"),
Bytes.toBytes("zhang_bj_01" + "\000"));
ResultScanner scanner = t_user_info.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
Result result = iter.next();
CellScanner cellScanner = result.cellScanner();
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();
System.out.println(new String(rowArray, current.getRowOffset(),
current.getRowLength()));
System.out.print(new String(familyArray, current
.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":"
+ new String(qualifierArray, current
.getQualifierOffset(), current
.getQualifierLength()));
System.out.println(" "
+ new String(valueArray, current.getValueOffset(),
current.getValueLength()));
}
System.out.println("-----------------------");
}
}
@Test
public void testFilter() throws Exception {
// 针对行键的前缀过滤器
/*
* Filter pf = new PrefixFilter(Bytes.toBytes("liu")); testScan(pf);
*/
// 行过滤器
/*
* RowFilter rf1 = new RowFilter(CompareOp.LESS, new
* BinaryComparator(Bytes.toBytes("user002"))); RowFilter rf2 = new
* RowFilter(CompareOp.EQUAL, new SubstringComparator("00"));
* testScan(rf1); System.out.println("**********"); testScan(rf2);
*/
// 针对指定一个列的value来过滤
/*
* SingleColumnValueFilter scvf = new
* SingleColumnValueFilter("base_info".getBytes(),
* "password".getBytes(), CompareOp.GREATER, "123456".getBytes());
* scvf.setFilterIfMissing(true); // 如果指定的列缺失,则也过滤掉 testScan(scvf);
*/
/*
* ByteArrayComparable comparator1 = new
* RegexStringComparator("^zhang"); ByteArrayComparable comparator2 =
* new SubstringComparator("ang"); SingleColumnValueFilter scvf = new
* SingleColumnValueFilter("base_info".getBytes(),
* "username".getBytes(), CompareOp.EQUAL, comparator1); testScan(scvf);
*/
// 针对列族名的过滤器 返回结果中只会包含满足条件的列族中的数据
/*
* FamilyFilter ff1 = new FamilyFilter(CompareOp.EQUAL, new
* BinaryComparator(Bytes.toBytes("inf"))); FamilyFilter ff2 = new
* FamilyFilter(CompareOp.EQUAL, new
* BinaryPrefixComparator(Bytes.toBytes("base"))); testScan(ff2);
*/
// 针对列名的过滤器 返回结果中只会包含满足条件的列的数据
/*
* QualifierFilter qf = new QualifierFilter(CompareOp.EQUAL, new
* BinaryComparator(Bytes.toBytes("password"))); QualifierFilter qf2 =
* new QualifierFilter(CompareOp.EQUAL, new
* BinaryPrefixComparator(Bytes.toBytes("us"))); testScan(qf);
*/
// 跟SingleColumnValueFilter结果不同,只返回符合条件的该column
/*
* ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
* testScan(cf);
*/
/*
* byte[][] prefixes = new byte[][] {
* Bytes.toBytes("username"),Bytes.toBytes("password") };
* MultipleColumnPrefixFilter mcf = new
* MultipleColumnPrefixFilter(prefixes); testScan(mcf);
*/
/*
* FamilyFilter ff2 = new FamilyFilter(CompareOp.EQUAL, new
* BinaryPrefixComparator(Bytes.toBytes("base"))); ColumnPrefixFilter cf
* = new ColumnPrefixFilter("passw".getBytes()); FilterList filterList =
* new FilterList(Operator.MUST_PASS_ALL); filterList.addFilter(ff2);
* filterList.addFilter(cf); testScan(filterList);
*/
}
public void testScan(Filter filter) throws Exception {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = t_user_info.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
Result result = iter.next();
CellScanner cellScanner = result.cellScanner();
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();
System.out.println(new String(rowArray, current.getRowOffset(),
current.getRowLength()));
System.out.print(new String(familyArray, current
.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":"
+ new String(qualifierArray, current
.getQualifierOffset(), current
.getQualifierLength()));
System.out.println(" "
+ new String(valueArray, current.getValueOffset(),
current.getValueLength()));
}
System.out.println("-----------------------");
}
}
/**
* 分页查询
*
* @throws Exception
*/
@Test
public void pageScan() throws Exception {
final byte[] POSTFIX = new byte[] { 0x00 };
Table table = conn.getTable(TableName.valueOf("t_user_info"));
Filter filter = new PageFilter(3); // 一次需要获取一页的条数
byte[] lastRow = null;
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
byte[] startRow = Bytes.add(lastRow, POSTFIX); // 设置本次查询的起始行键
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(++localRows + ":" + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0)
break;
Thread.sleep(2000);
}
System.out.println("total rows:" + totalRows);
}
public static void main(String[] args) throws Exception {
HbaseDemo demo = new HbaseDemo();
demo.init();
demo.testScan();
}
}
1-2)、过滤器查询
引言:过滤器的类型很多,但是可以分为两大类 : 比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
hbase过滤器的比较运算符:
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有
Hbase过滤器的比较器(指定比较机制):
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出现在value中。
1-3)Hbase的过滤器分类
A)、比较过滤器
1-1)、行键过滤器RowFilter
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
1-2)、列族过滤器FamilyFilter
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);
1-3)、列过滤器QualifierFilter
filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);
1-4)、值过滤器 ValueFilter
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(".4"));
scan.setFilter(filter1);
B)、专用过滤器
1-1)、单列值过滤器 SingleColumnValueFilter
----会返回满足条件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL, new SubstringComparator(
"val-5"));
filter.setFilterIfMissing(true); // 如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
1-2)、SingleColumnValueExcludeFilter
与上相反
1-3)、前缀过滤器 PrefixFilter----针对行键
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
1-4)、列前缀过滤器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
1-5)、分页过滤器 PageFilter
package HbaseProject;
public class HbasePageFilter {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum",
"hadoop1:2181,hadoop2:2181,hadoop3:2181");
String tableName = "testfilter";
String cfName = "f1";
final byte[] POSTFIX = new byte[] { 0x00 };
HTable table = new HTable(conf, tableName);
Filter filter = new PageFilter(3);
byte[] lastRow = null;
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 注意这里添加了POSTFIX操作,用来重置扫描边界
byte[] startRow = Bytes.add(lastRow, POSTFIX);
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ":" + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0)
break;
}
System.out.println("total rows:" + totalRows);
}
/**
* 多种过滤条件的使用方法
*
* @throws Exception
*/
@Test
public void testScan() throws Exception{
HTable table = new HTable(conf, "person_info".getBytes());
Scan scan = new Scan(Bytes.toBytes("person_rk_bj_zhang_000001"), Bytes.toBytes("person_rk_bj_zhang_000002"));
//前缀过滤器----针对行键
Filter filter = new PrefixFilter(Bytes.toBytes("rk"));
//行过滤器 ---针对行键
ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_rk_bj_zhang_000001"));
RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator);
/**
* 假设rowkey格式为:创建日期_发布日期_ID_TITLE
* 目标:查找 发布日期 为 2014-12-21 的数据
* sc.textFile("path").flatMap(line=>line.split("\t")).map(x=>(x,1)).reduceByKey(_+_).map((_(2),_(1))).sortByKey().map((_(2),_(1))).saveAsTextFile("")
*
*
*/
rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2014-12-21_"));
//单值过滤器1完整匹配字节数组
new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes());
//单值过滤器2 匹配正则表达式
ByteArrayComparable comparator = new RegexStringComparator("zhang.");
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);
//单值过滤器3匹配是否包含子串,大小写不敏感
comparator = new SubstringComparator("wu");
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);
//键值对元数据过滤-----family过滤----字节数组完整匹配
FamilyFilter ff = new FamilyFilter(
CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes("base_info")) //表中不存在inf列族,过滤结果为空
);
//键值对元数据过滤-----family过滤----字节数组前缀匹配
ff = new FamilyFilter(
CompareOp.EQUAL ,
new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以inf打头的列族info,过滤结果为该列族所有行
);
//键值对元数据过滤-----qualifier过滤----字节数组完整匹配
filter = new QualifierFilter(
CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes("na")) //表中不存在na列,过滤结果为空
);
filter = new QualifierFilter(
CompareOp.EQUAL ,
new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以na打头的列name,过滤结果为所有行的该列数据
);
//基于列名(即Qualifier)前缀过滤数据的ColumnPrefixFilter
filter = new ColumnPrefixFilter("na".getBytes());
//基于列名(即Qualifier)多个前缀过滤数据的MultipleColumnPrefixFilter
byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")};
filter = new MultipleColumnPrefixFilter(prefixes);
//为查询设置过滤条件
scan.setFilter(filter);
scan.addFamily(Bytes.toBytes("base_info"));
//一行
// Result result = table.get(get);
//多行的数据
ResultScanner scanner = table.getScanner(scan);
for(Result r : scanner){
/**
for(KeyValue kv : r.list()){
String family = new String(kv.getFamily());
System.out.println(family);
String qualifier = new String(kv.getQualifier());
System.out.println(qualifier);
System.out.println(new String(kv.getValue()));
}
*/
//直接从result中取到某个特定的value
byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
System.out.println(new String(value));
}
table.close();
}
}
hbase内部原理
1-1)、系统架构
A)、Client
包含访问hbase的接口,client维护着一些cache来加快对hbase的访问,比如regione的位置信息。
B)、Zookeeper
1、保证任何时候,集群中只有一个master
2、存贮所有Region的寻址入口----root表在哪台服务器上。
3、实时监控Region Server的状态,将Region server的上线和下线信息实时通知给Master
4、存储Hbase的schema,包括有哪些table,每个table有哪些column family
C)、Master职责
1、为Region server分配region
2、负责region server的负载均衡
3、发现失效的region server并重新分配其上的region
4、HDFS上的垃圾文件回收
5、处理schema更新请求
D)、Region Server职责
1、Region server维护Master分配给它的region,处理对这些region的IO请求
2、Region server负责切分在运行过程中变得过大的region,可以看到,client访问hbase上数据的过程并不需要master参与(寻址访问zookeeper和region server,数据读写访问regione server),master仅仅维护者table和region的元数据信息,负载很低。
1-2)、物理存储
A)、整体结构
1、Table中的所有行都按照row key的字典序排列。
2、Table 在行的方向上分割为多个H region。
3、region按大小分割的(默认10G),每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,Hregion就会等分会两个新的Hregion。当table中的行不断增多,就会有越来越多的Hregion。
4、Hregion是Hbase中分布式存储和负载均衡的最小单元。最小单元就表示不同的Hregion可以分布在不同的HRegion server上。但一个Hregion是不会拆分到多个server上的。
5、HRegion虽然是负载均衡的最小单元,但并不是物理存储的最小单元。
事实上,HRegion由一个或者多个Store组成,每个store保存一个column family。
每个Strore又由一个memStore和0至多个StoreFile组成。如上图
B)、Srore File & HFile结构
StoreFile以HFile格式保存在HDFS上。
附:HFile的格式为:
1、首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。正如图中所示的,Trailer中有指针指向其他数 据块的起始点。
2、 File Info中记录了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。
3、 Data Index和Meta Index块记录了每个Data块和Meta块的起始点。
4、 Data Block是HBase I/O的基本单元,为了提高效率,HRegionServer中有基于LRU的Block Cache机制。每个Data块的大小可以在创建一个Table的时候通过参数指定,大号的Block有利于顺序Scan,小号Block利于随机查询。 每个Data块除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏。
HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。我们来看看里面的具体结构:
5、 开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是 RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。
HFile分为六个部分:
Data Block 段–保存表中的数据,这部分可以被压缩。
Meta Block 段 (可选的)–保存用户自定义的kv对,可以被压缩。
File Info 段–Hfile的元信息,不被压缩,用户也可以在这一部分添加自己的元信息。
Data Block Index 段–Data Block的索引。每条索引的key是被索引的block的第一条记录的key。
Meta Block Index段 (可选的)–Meta Block的索引。
Trailer–这一段是定长的。保存了每一段的偏移量,读取一个HFile时,会首先 读取Trailer,Trailer保存了每个段的起始位置(段的Magic Number用来做安全check),然后,DataBlock Index会被读取到内存中,这样,当检索某个key时,不需要扫描整个HFile,而只需从内存中找到key所在的block,通过一次磁盘io将整个 block读取到内存中,再找到需要的key。DataBlock Index采用LRU机制淘汰。
HFile的Data Block,Meta Block通常采用压缩方式存储,压缩之后可以大大减少网络IO和磁盘IO,随之而来的开销当然是需要花费cpu进行压缩和解压缩。
目标Hfile的压缩支持两种方式:Gzip,Lzo。
C)、Memstore与Storefile
一个region由多个store组成,每个store包含一个列族的所有数据
Store包括位于内存的memstore和位于硬盘的storefile
写操作先写入memstore,当memstore中的数据量达到某个阈值,Hregionserver启动flashcache进程写入storefile,每次写入形成单独一个storefile
当storefile大小超过一定阈值后,会把当前的region分割成两个,并由Hmaster分配给相应的region服务器,实现负载均衡
客户端检索数据时,先在memstore找,找不到再找storefile
D)、HLog(WAL log)
WAL 意为Write ahead log(http://en.wikipedia.org/wiki/Write-ahead_logging),类似mysql中的binlog,用来 做灾难恢复只用,Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。
每个Region Server维护一个Hlog,而不是每个Region一个。这样不同region(来自不同table)的日志会混在一起,这样做的目的是不断追加单个文件相对于同时写多个文件而言,可以减少磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台region server下线,为了恢复其上的region,需要将region server上的log进行拆分,然后分发到其它region server上进行恢复。
HLog文件就是一个普通的Hadoop Sequence File:
1、HLog Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
2、HLog Sequece File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue,可参见上文描述。
寻址机制
1-1)、寻址示意图
1-2)、-ROOT-和.META.表结构
.META.行记录结构
1-3)、寻址流程
现在假设我们要从Table2里面插寻一条RowKey是RK10000的数据。那么我们应该遵循以下步骤:
1. 从.META.表里面查询哪个Region包含这条数据。
2. 获取管理这个Region的RegionServer地址。
3. 连接这个RegionServer, 查到这条数据。
系统如何找到某个row key (或者某个 row key range)所在的region
bigtable 使用三层类似B+树的结构来保存region位置。
第一层是保存zookeeper里面的文件,它持有root region的位置。
第二层root region是.META.表的第一个region其中保存了.META.表其它region的位置。通过root region,我们就可以访问.META.表的数据。
.META.是第三层,它是一个特殊的表,保存了hbase中所有数据表的region 位置信息。
说明:
1 root region永远不会被split,保证了最需要三次跳转,就能定位到任意region 。
2.META.表每行保存一个region的位置信息,row key 采用表名+表的最后一行编码而成。
3 为了加快访问,.META.表的全部region都保存在内存中。
4 client会将查询过的位置信息保存缓存起来,缓存不会主动失效,因此如果client上的缓存全部失效,则需要进行最多6次网络来回,才能定位到正确的region(其中三次用来发现缓存失效,另外三次用来获取位置信息)。
1-4)、读写过程
A)、读请求过程:
1、客户端通过zookeeper以及root表和meta表找到目标数据所在的regionserver
2、联系regionserver查询目标数据
3、regionserver定位到目标数据所在的region,发出查询请求
4、region先在memstore中查找,命中则返回
5、如果在memstore中找不到,则在storefile中扫描(可能会扫描到很多的storefile----bloomfilter)
B)、写请求过程:
1、client向region server提交写请求
2、region server找到目标region
3、region检查数据是否与schema一致
4、如果客户端没有指定版本,则获取当前系统时间作为数据版本
5、将更新写入WAL log
6、将更新写入Memstore
7、判断Memstore的是否需要flush为Store文件。
细节描述:
hbase使用MemStore和StoreFile存储对表的更新。
数据在更新时首先写入Log(WAL log)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并 且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时,系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了。
当系统出现意外时,可能导致内存(MemStore)中的数据丢失,此时使用Log(WAL log)来恢复checkpoint之后的数据。
StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(minor_compact, major_compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行split,等分为两个StoreFile。
由于对表的更新是不断追加的,compact时,需要访问Store中全部的 StoreFile和MemStore,将他们按row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,合并的过程还是比较快。
Region管理
1-1)、Region分配
任何时刻,一个region只能分配给一个region server。master记录了当前有哪些可用的region server。以及当前哪些region分配给了哪些region server,哪些region还没有分配。当需要分配的新的region,并且有一个region server上有可用空间时,master就给这个region server发送一个装载请求,把region分配给这个region server。region server得到请求后,就开始对此region提供服务。
1-2)、Region server上线
master使用zookeeper来跟踪region server状态。当某个region server启动时,会首先在zookeeper上的server目录下建立代表自己的znode。由于master订阅了server目录上的变更消息,当server目录下的文件出现新增或删除操作时,master可以得到来自zookeeper的实时通知。因此一旦region server上线,master能马上得到消息。
1-3)、Region server下线
当region server下线时,它和zookeeper的会话断开,zookeeper而自动释放代表这台server的文件上的独占锁。master就可以确定:
1、region server和zookeeper之间的网络断开了。
2、region server挂了。
无论哪种情况,region server都无法继续为它的region提供服务了,此时master会删除server目录下代表这台region server的znode数据,并将这台region server的region分配给其它还活着的同志。
1-4)、Master工作机制
A)、master上线
master启动进行以下步骤:
1、从zookeeper上获取唯一一个代表active master的锁,用来阻止其它master成为master。
2、扫描zookeeper上的server父节点,获得当前可用的region server列表。
3、和每个region server通信,获得当前已分配的region和region server的对应关系。
4、扫描.META.region的集合,计算得到当前还未分配的region,将他们放入待分配region列表。
B)、master下线
由于master只维护表和region的元数据,而不参与表数据IO的过程,master下线仅导致所有元数据的修改被冻结(无法创建删除表,无法修改表的schema,无法进行region的负载均衡,无法处理region 上下线,无法进行region的合并,唯一例外的是region的split可以正常进行,因为只有region server参与),表的数据读写还可以正常进行。因此master下线短时间内对整个hbase集群没有影响。
从上线过程可以看到,master保存的信息全是可以冗余信息(都可以从系统其它地方收集到或者计算出来)
因此,一般hbase集群中总是有一个master在提供服务,还有一个以上的‘master’在等待时机抢占它的位置。
Hbase高级应用
1-1)、建表高级属性
下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性
A)、BloomFilter默认是NONE 是否使用布隆过虑及使用何种方式
布隆过滤可以每列族单独启用。
使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 对列族单独启用布隆。
1、Default = ROW 对行进行布隆过滤。
2、 ROW,行键的哈希在每次插入行时将被添加到布隆。
3、对 ROWCOL,行键 + 列族 + 列族修饰的哈希将在每次插入行时添加到布隆
使用方法: create 'table',{BLOOMFILTER =>'ROW'} 启用布隆过滤可以节省读磁盘过程,可以有助于降低读取延迟
B)、Version 的版本
VERSIONS 默认是1 这个参数的意思是数据保留1个 版本,如果我们认为我们的数据没有这么大的必要保留这么多,随时都在更新,而老版本的数据对我们毫无价值,那将此参数设为1 能节约2/3的空间
使用方法: create 'table',{VERSIONS=>'2'}
附:MIN_VERSIONS => '0'是说在compact操作执行之后,至少要保留的版本
C)、Compression 命令
COMPRESSION 默认值是NONE 即不使用压缩
1、这个参数意思是该列族是否采用压缩,采用什么压缩算法
2、使用方法: create 'table',{NAME=>'info',COMPRESSION=>'SNAPPY'}
3、建议采用SNAPPY压缩算法
HBase中,在Snappy发布之前(Google 2011年对外发布Snappy),采用的LZO算法,目标是达到尽可能快的压缩和解压速度,同时减少对CPU的消耗;
在Snappy发布之后,建议采用Snappy算法(参考《HBase: The Definitive Guide》),具体可以根据实际情况对LZO和Snappy做过更详细的对比测试后再做选择。
Algorithm |
% remaining |
Encoding |
Decoding |
GZIP |
13.4% |
21 MB/s |
118 MB/s |
LZO |
20.5% |
135 MB/s |
410 MB/s |
Zippy/Snappy |
22.2% |
172 MB/s |
409 MB/s |
如果建表之初没有压缩,后来想要加入压缩算法,可以通过alter修改schema
D)、Alter 命令
使用方法:
如 修改压缩算法
disable 'table'
alter 'table',{NAME=>'info',COMPRESSION=>'snappy'}
enable 'table'
但是需要执行major_compact 'table' 命令之后 才会做实际的操作。
E)、TTL 命令
默认是 2147483647 即:Integer.MAX_VALUE 值大概是68年
这个参数是说明该列族数据的存活时间,单位是s
这个参数可以根据具体的需求对数据设定存活时间,超过存过时间的数据将在表中不在显示,待下次major compact的时候再彻底删除数据
注意的是TTL设定之后 MIN_VERSIONS=>'0' 这样设置之后,TTL时间戳过期后,将全部彻底删除该family下所有的数据,如果MIN_VERSIONS 不等于0那将保留最新的MIN_VERSIONS个版本的数据,其它的全部删除,比如MIN_VERSIONS=>'1' 届时将保留一个最新版本的数据,其它版本的数据将不再保存。
F)、Describe 命令
describe 'table' 这个命令查看了create table 的各项参数或者是默认值。
G)、Disable_all 命令
disable_all 'toplist.*' disable_all 支持正则表达式,并列出当前匹配的表的如下:
toplist_a_total_1001 toplist_a_total_1002 toplist_a_total_1008
toplist_a_total_1009
toplist_a_total_1019 toplist_a_total_1035
...
Disable the above 25 tables (y/n)? 并给出确认提示
H)、Drop_all 命令
drop_all 这个命令和disable_all的使用方式是一样的
hbase 表预分区----手动分区
默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。
命令方式:
create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
也可以使用api的方式:
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info
参数:
test_table是表名
HexStringSplit 是split 方式
-c 是分10个region
-f 是family
可在UI上查看结果,如图:
这样就可以将表预先分为15个区,减少数据达到storefile 大小的时候自动分区的时间消耗,并且还有以一个优势,就是合理设计rowkey 能让各个region 的并发请求平均分配(趋于均匀) 使IO 效率达到最高,但是预分区需要将filesize 设置一个较大的值,设置哪个参数呢 hbase.hregion.max.filesize 这个值默认是10G 也就是说单个region 默认大小是10G
这个参数的默认值在0.90 到0.92到0.94.3各版本的变化:256M--1G--10G
但是如果MapReduce Input类型为TableInputFormat 使用hbase作为输入的时候,就要注意了,每个region一个map,如果数据小于10G 那只会启用一个map 造成很大的资源浪费,这时候可以考虑适当调小该参数的值,或者采用预分配region的方式,并将检测如果达到这个值,再手动分配region。
1-2)、hbase应用案例看行键设计
表结构设计
A)、列族数量的设定
以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族;
B)、行键的设计
语音详单:
13877889988-20150625
13877889988-20150625
13877889988-20150626
13877889988-20150626
13877889989
13877889989
13877889989
----将需要批量查询的数据尽可能连续存放
CMS系统----多条件查询
尽可能将查询条件关键词拼装到rowkey中,查询频率最高的条件尽量往前靠
20150230-zhangsan-category…
20150230-lisi-category…
(每一个条件的值长度不同,可以通过做定长映射来提高效率)
参考:《hbase 实战》----详细讲述了facebook /GIS等系统的表结构设计
1-3)、Hbase和Mapreduce结合
为什么需要用mapreduce去访问hbase的数据?
——加快分析速度和扩展分析能力
Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用
1-4)、从Hbase中读取数据、分析,写入Hdfs
public static void main(String[] args) throws Exception {
/**
* public abstract class TableMapper<KEYOUT, VALUEOUT> extends
* Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { }
*
*/
public class HbaseReader {
public static String flow_fields_import = "flow_fields_import";
static class HdfsSinkMapper extends TableMapper<Text, NullWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException,
InterruptedException {
byte[] bytes = key.copyBytes();
String phone = new String(bytes);
byte[] urlbytes = value.getValue("f1".getBytes(),
"url".getBytes());
String url = new String(urlbytes);
context.write(new Text(phone + "\t" + url),
NullWritable.get());
}
}
static class HdfsSinkReducer extends
Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values,
Context context) throws IOException,
InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan,
HdfsSinkMapper.class, Text.class, NullWritable.class,
job);
job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job, new Path(
"c:/hbasetest/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
}
1-5)、从Hdfs中读取数据写入Hbase
/**
* public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends
* Reducer<KEYIN, VALUEIN, KEYOUT, Writable> { }
*
* @author @****.cn
*
*/
public class HbaseSinker {
public static String flow_fields_import = "flow_fields_import";
static class HbaseSinkMrMapper extends
Mapper<LongWritable, Text, FlowBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
String url = fields[1];
FlowBean bean = new FlowBean(phone, url);
context.write(bean, NullWritable.get());
}
}
static class HbaseSinkMrReducer
extends
TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> {
@Override
protected void reduce(FlowBean key,
Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
Put put = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(), "url".getBytes(), key.getUrl()
.getBytes());
context.write(new ImmutableBytesWritable(key.getPhone()
.getBytes()), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
boolean tableExists = hBaseAdmin
.tableExists(flow_fields_import);
if (tableExists) {
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptor desc = new HTableDescriptor(
TableName.valueOf(flow_fields_import));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(
"f1".getBytes());
desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import,
HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job,
new Path("c:/hbasetest/data"));
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
}
1-6)、Coprocessor -- 协处理器
协处理器有两种:observer和endpoint
Observer允许集群在正常的客户端操作过程中可以有不同的行为表现
Endpoint允许扩展集群的能力,对客户端应用开放新的运算命令
A)、Observer协处理器
1-1)、正常put请求的流程
1-2)、加入Observer协处理后的put流程
1、客户端发出put请求
2、该请求被分派给合适的RegionServer和region
3、coprocessorHost拦截该请求,然后在该表上登记的每个RegionObserver上调用prePut()
4、如果没有被prePut()拦截,该请求继续送到region,然后进行处理
5、region产生的结果再次被CoprocessorHost拦截,调用postPut()
6、假如没有postPut()拦截该响应,最终结果被返回给客户端
1-3)、Observer的类型
RegionObs——这种Observer钩在数据访问和操作阶段,所有标准的数据操作命令都可以被pre-hooks和post-hooks拦截
WALObserver——WAL所支持的Observer;可用的钩子是pre-WAL和post-WAL
MasterObserver——钩住DDL事件,如表创建或模式修改
1-4)、Observer应用场景示例
见下节;
Endpoint—参考《Hbase 权威指南》
1-7、二级索引
row key在HBase中是以B+ tree结构化有序存储的,所以scan起来会比较效率。单表以row key存储索引,column value存储id值或其他数据 ,这就是Hbase索引表的结构。
由于HBase本身没有二级索引(Secondary Index)机制,基于索引检索数据只能单纯地依靠RowKey,为了能支持多条件查询,开发者需要将所有可能作为查询条件的字段一一拼接到RowKey中,这是HBase开发中极为常见的做法
比如,现在有一张1亿的用户信息表,建有出生地和年龄两个索引,我想得到一个条件是在杭州出生,年龄为20岁的按用户id正序排列前10个的用户列表。
有一种方案是,系统先扫描出生地为杭州的索引,得到一个用户id结果集,这个集合的规模假设是10万。然后扫描年龄,规模是5万,最后merge这些用户id,去重,排序得到结果。
这明显有问题,如何改良?
保证出生地和年龄的结果是排过序的,可以减少merge的数据量?但Hbase是按row key排序,value是不能排序的。
变通一下——将用户id冗余到row key里?OK,这是一种解决方案了,这个方案的图示如下:
merge时提取交集就是所需要的列表,顺序是靠索引增加了_id,以字典序保证的。
B)、 按索引查询种类建立组合索引
在方案1的场景中,想象一下,如果单索引数量多达10个会怎么样?10个索引,就要merge 10次,性能可想而知。
解决这个问题需要参考RDBMS的组合索引实现。
比如出生地和年龄需要同时查询,此时如果建立一个出生地和年龄的组合索引,查询时效率会高出merge很多。
当然,这个索引也需要冗余用户id,目的是让结果自然有序。结构图示如下:
这个方案的优点是查询速度非常快,根据查询条件,只需要到一张表中检索即可得到结果list。缺点是如果有多个索引,就要建立多个与查询条件一一对应的组合索引
而索引表的维护如果交给应用客户端,则无疑增加了应用端开发的负担
通过协处理器可以将索引表维护的工作从应用端剥离
C)、利用Observer自动维护索引表示例
在社交类应用中,经常需要快速检索各用户的关注列表t_guanzhu,同时,又需要反向检索各种户的粉丝列表t_fensi,为了实现这个需求,最佳实践是建立两张互为反向的表:
1-1)、正向关注
一个表为正向索引关注表 “t_guanzhu”:
Rowkey: A-B
f1:From
f1:To
1-2)、反向关注
另一个表为反向索引粉丝表:“t_fensi”:
Rowkey: B—A
f1:From
f1:To
插入一条关注信息时,为了减轻应用端维护反向索引表的负担,可用Observer协处理器实现:
1-8)、编写自定义RegionServer
package HbaseProject;
public class InverIndexCoprocessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// set configuration
Configuration conf = HBaseConfiguration.create();
// need conf.set...
HTable table = new HTable(conf, "t_fensi");
Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);
Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);
byte[] valueArray = fromCell.getValue();
String from = new String(valueArray);
valueArray = toCell.getValue();
String to = new String(valueArray);
Put putIndex = new Put((to + "-" + from).getBytes());
putIndex.add("f1".getBytes(), "From".getBytes(), from.getBytes());
putIndex.add("f1".getBytes(), "To".getBytes(), to.getBytes());
table.put(putIndex);
table.close();
}
}
[root@hadoop1 ~]# hadoop fs -put fensiguanzhu.jar /demo/
修改t_fensi的schema,注册协处理器
hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.****.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
D)、检查是否注册成功
hbase(main):018:0> describe 'ff'
DESCRIPTION ENABLED
'ff', {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.****.bi true
gdata.hbasecoprocessor.TestCoprocessor|1001|'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMF
ILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0
', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', B
LOCKCACHE => 'true'}, {NAME => 'f2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATIO
N_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KE
EP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0250 seconds
可以看出block的大小是65536,也就是65M,这样可以快速的查找数据。
YCSB 测试HBase的性能
参考资料:
https://github.com/brianfrankcooper/YCSB/issues/548
https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties
http://blog.csdn.net/xfg0218/article/details/79232280
1-1)、软件下载
https://github.com/brianfrankcooper/YCSB/wiki
1-2)、使用请查看
https://github.com/brianfrankcooper/YCSB/wiki/Getting-Started
1-3)、测试步骤
A)、清除前查看内存
# free -m
B)、执行清除
#sync
# echo 1 > /proc/sys/vm/drop_caches
# echo 2 > /proc/sys/vm/drop_caches
# echo 3 > /proc/sys/vm/drop_caches
C)、清除后查看
# free -g
D)、创建hbase表
#n_splits=200
#create 'usertable','cf',{SPLITS=> (1..n_splits).map{|i| "user#{1000000000000000000+i*(9955555555555555555-1000000000000000000)/n_splits}"}}
#alter 'usertable',{NAME=>'cf',COMPRESSION=>'SNAPPY'}
#enable 'usertable'
#vi workload12.sh
#time ./ycsb load hbase10 -P ../workloads/workloada -p table=usertable -cp /usr/hdp/2.5.3.0-37/hbase/conf -p columnfamily=cf -p fieldcount=10 -p fieldlength=100 -p recordcount=2000000 -p insertorder=hashed -p insertstart=0 -threads 200 -s
******************************
[OVERALL], RunTime(ms), 792190.0
[OVERALL], Throughput(ops/sec), 25246.468650197552
[TOTAL_GCS_PS_Scavenge], Count, 842.0
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 4913.0
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.6201795023921028
[TOTAL_GCS_PS_MarkSweep], Count, 1.0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 61.0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.007700172938310254
[TOTAL_GCs], Count, 843.0
[TOTAL_GC_TIME], Time(ms), 4974.0
[TOTAL_GC_TIME_%], Time(%), 0.6278796753304131
[CLEANUP], Operations, 400.0
[CLEANUP], AverageLatency(us), 315.5375
[CLEANUP], MinLatency(us), 1.0
[CLEANUP], MaxLatency(us), 123327.0
[CLEANUP], 95thPercentileLatency(us), 13.0
[CLEANUP], 99thPercentileLatency(us), 36.0
[INSERT], Operations, 2.0E7
[INSERT], AverageLatency(us), 7833.6129202
[INSERT], MinLatency(us), 425.0
[INSERT], MaxLatency(us), 713215.0
[INSERT], 95thPercentileLatency(us), 28335.0
[INSERT], 99thPercentileLatency(us), 83071.0
[INSERT], Return=OK, 20000000
real 13m13.820s
user 21m35.234s
sys 12m6.654s
Hbase 常见错误问题
1-1)、Hbase日志频繁flush
查看HBase监控,发现集群频繁做压缩,甚至主压缩
[regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CS50_record_appendix,1E911544023001001092443090+1473091200000+1+1473004800000+092507,1483927310369.db76fd1f8e62c525e07034e8d0eaf0c2. after a delay of 7687
[MemStoreFlusher.1] regionserver.HRegion: Finished memstore flush of ~8.5 M/8902368, currentsize=0/0 for region CG40_RRT,shuyuan2013 +1390025229426+E902428585+2014020626000G67120314013AO0104 ,1399857203189.15f9814a28cd3669aa2e5ba64f499daa. in 299ms, sequenceid=2808429647, compaction requested=false
**************************
hbase.hregion.memstore.block.multiplier: 12
hbase.regionserver.global.memstore.upperLimit: 0.6
hbase.regionserver.global.memstore.lowerLimit : 0.55
periodicFlusher是定时检查是否需要执行flush,由hbase.regionserver.optionalcacheflushinterval参数控制。从日志来看,这造成频繁flush,并产生大量小的storefile。
把该参数设置为0,并且调整如下参数
hbase.hregion.memstore.block.multiplier: 12
hbase.regionserver.global.memstore.upperLimit: 0.6
hbase.regionserver.global.memstore.lowerLimit : 0.55
目前情况有缓解,但是hlog会不断增长,导致too many hlog,从而堵塞写入,不过影响大概是几十秒。
[regionserver60020.logRoller] wal.FSHLog: Too many hlogs: logs=33, maxlogs=32; forcing flush of 157 regions(s)
导致这样原因在于一个节点的region数据太多,max hlog需要根据regionserver数调整下。
Phoenix映射Hbase数据表
1-1)、创建内部表
使用Phoenix映射HBase表时,在Phoenix使用create table来创建内部表时,如果对数据进行修改,hbase中表中的数据也会发生改变,如果在Phoenix删除表hbase中的表也会删除,如果不想删除Hbase中的表请查看如下创建视图一节
hbase(main):007:0> create 'hbase_phoenix','info'
hbase(main):009:0> put 'hbase_phoenix', 'row001','info:name','phoenix'
hbase(main):008:0> put 'hbase_phoenix', 'row002','info:name','hbase'
hbase(main):010:0> scan 'hbase_phoenix'
ROW COLUMN+CELL
row001 column=info:name, timestamp=1518078552951, value=phoenix
row002 column=info:name, timestamp=1518078543724, value=hbase
2 row(s) in 0.0470 seconds
0: jdbc:phoenix:> create table "hbase_phoenix"("ROW" varchar primary key, "info"."name" varchar);
2 rows affected (7.01 seconds)
在这里创建表时需要注意列族以及列名需要用双引号括起来,因为phoenix对大小写特别敏感,如果不加双印,默认的是大写
0: jdbc:phoenix:> select * from "hbase_phoenix";
+---------+----------+
| ROW | name |
+---------+----------+
| row001 | phoenix |
| row002 | hbase |
+---------+----------+
2 rows selected (0.058 seconds)
查询表中的数据时也需要注意引号问题
查看:https://phoenix.apache.org/
或查看Hbase导数据的几种方式章节中的Phoneix导入Hbase数据详解
1-2)、创建视图
Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作。而且相比于直接创建映射表,视图的查询效率会低,原因是:创建映射表的时候,Phoenix会在表中创建一些空的键值对,而这些空键值对的存在则可以用来提高查询效率。
如果在Phoenix中使用创建视图的形式,如果在Phoenix中删除视图,Hbase则不会删除表,建议使用此方时映射Hbase数据。
hbase(main):017:0> create 'hbase_phoenix_view','0'
hbase(main):018:0>put 'hbase_phoenix_view','row1','0:name','1'
hbase(main):019:0>put 'hbase_phoenix_view','row1','0:aeg','2'
hbase(main):020:0>put 'hbase_phoenix_view','row2','0:name','1'
hbase(main):021:0>put 'hbase_phoenix_view','row2','0:aeg','2'
0: jdbc:phoenix:> create view "hbase_phoenix_view"(empid varchar primary key,"0"."name" varchar,"0"."age" varchar);
0: jdbc:phoenix:> select * from "hbase_phoenix_view";
+--------+-------+------+
| EMPID | name | age |
+--------+-------+------+
| row1 | 1 | 2 |
| row2 | 1 | 2 |
+--------+-------+------+
Hbase数据质量检查
HBaseFsck(hbck)是一个用于检查区域一致性和表完整性问题并修复损坏的HBase的工具。它工作在两种基本模式 - 只读不一致识别模式和多阶段读写修复模式。
详细监察部周请查看:http://blog.csdn.net/xfg0218/article/details/79542284
1-1)、检查HBase集群是否损坏
$ hbase hbck
**********************
Table InsureRRT is okay.
Number of regions: 1
Deployed on: alpha-cn-01.cars.com,16020,1520927144895
0 inconsistencies detected.
Status: OK
2018-03-13 03:59:58,773 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea3880024 closed
2018-03-13 03:59:58,773 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
2018-03-13 03:59:58,773 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
2018-03-13 03:59:58,774 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x1621e4ea3880025
2018-03-13 03:59:58,789 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea3880025 closed
2018-03-13 03:59:58,789 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
1-2)、使用Hbase的-details选项将报告更多细节
$ hbase hbck -details
***************
0 inconsistencies detected.
Status: OK
2018-03-13 04:38:43,731 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea3880028 closed
2018-03-13 04:38:43,732 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
2018-03-13 04:38:43,732 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x1621e4ea3880029
2018-03-13 04:38:43,732 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
2018-03-13 04:38:43,760 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea3880029 closed
2018-03-13 04:38:43,760 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
1-3)、检测某个表的健康情况
$ hbase hbck test_hbase_hbase test_hbase
test_hbase_hbase与test_hbase 代表表名
********************
0 inconsistencies detected.
Status: OK
2018-03-13 04:42:37,911 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea388002d closed
2018-03-13 04:42:37,911 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
2018-03-13 04:42:37,911 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x1621e4ea388002e
2018-03-13 04:42:37,912 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
2018-03-13 04:42:37,935 INFO [main] zookeeper.ZooKeeper: Session: 0x1621e4ea388002e closed
2018-03-13 04:42:37,936 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
详细日志:http://blog.csdn.net/xfg0218/article/details/79542565
Hbase导数据的几种方式
测试数据请到Blog中下载:http://blog.csdn.net/xfg0218/article/details/51712157
1-1)、hive-hbase-handler导数据
A)、反编译JAR包
http://www.apache.org/dyn/closer.cgi/hive/选择apache-hive-1.2.1-src.tar.gz点击下载之后使用MyEclipse进行反编译,或者使用作者反编译好的JAR 链接:http://pan.baidu.com/s/1hscaORi 密码:wv6p
放在/opt/hive-1.2/lib/下记得备份之前的JAR包
B)、修改配置文件
在hive的conf目录下修改一下文件
[root@skycloud1 conf]# vi hive-site.xml
在之前的基础上添加以下内容
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/hive-1.2/lib/hive-hbase-handler-1.2.1.jar,file:///opt/hive-1.2/lib/guava-14.0.1.jar,file:///opt/hbase-1.2.1/lib/hbase-common-1.2.1.jar,file:///opt/hbase-1.2.1/lib/hbase-client-1.2.1.jar,file:///opt/hive-1.2/lib/zookeeper-3.4.6.jar</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>skycloud1:2181,skycloud2:2181,skycloud3:2181</value>
</property>
C)、创建Hive的表结构
Hive > create table hive_hbase_test(id int,name string,age int);
D)、插入数据
hive> insert into hive_hbae_test(id,name,age) values(1,"xiaozhang","18");
hive> insert into hive_hbase_test(id,name,age) values(2,"xiaowang","19");
E)、查看Hive中的数据
hive> select * from hive_hbase_test;
OK
1 xiaozhang 18
2 xiaowang 19
Time taken: 0.081 seconds, Fetched: 2 row(s)
F)、映射Hbase的表
Hive > create table hive_hbase_pro(row_key string,id bigint,name string,age int)
STORED BY "org.apache.hadoop.hive.hbase.HBaseStorageHandler" WITH SERDEPROPERTIES
("hbase.columns.mapping" = ":key,info:id,info:name,info:age")
TBLPROPERTIES ("hbase.table.name"="hive_hbase_pro");
Hive > create external table hive_hbase_pro(row_key string,id bigint,name string,age int)
STORED BY "org.apache.hadoop.hive.hbase.HBaseStorageHandler" WITH SERDEPROPERTIES
("hbase.columns.mapping" = ":key,info:id,info:name,info:age")
TBLPROPERTIES ("hbase.table.name"="hive_hbase_pro");
说明:org.apache.hadoop.hive.hbase.HBaseStorageHandler是Hbase的储存方式
Hbase.columns.mapping:是作为Hbase的映射rowkey与列族
hive_hbase_pro:映射给Hbase的表名字
external : 可创建外表,Hbase中已经存在的表映射到hive用词操作,区别请查看一下解释
G)、查看HBase中表结构
hbase(main):020:0> describe 'hive_hbase_pro'
Table hive_hbase_pro is ENABLED
hive_hbase_pro
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =>
'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOP
E => '0'}
1 row(s) in 0.0170 seconds
H)、插入到映射Hbase表中的数据
-- 关闭hbase的WAL,有点事提高了写入速度,缺点是如果出现错误无法查找日志
hive> set hive.hbase.wal.enabled=false;
-- 开启大量导入配置
hive> set hive.hbase.bulk=true;
-- 设置扫描的缓存
hive> set hbase.client.scanner.caching=1000000;
hive> insert overwrite table hive_hbase_pro select id as row_key,id,name,age from hive_hbase_test;
I)、查看映射表的数据
hive> select * from hive_hbase_pro;
OK
1 1 xiaozhang 18
2 2 xiaowang 19
Time taken: 0.121 seconds, Fetched: 2 row(s)
J)、查看Hbase表中的数据
hbase(main):021:0> scan 'hive_hbase_pro'
ROW COLUMN+CELL
1 column=info:age, timestamp=1510126017074, value=18
1 column=info:id, timestamp=1510126017074, value=1
1 column=info:name, timestamp=1510126017074, value=xiaozhang
2 column=info:age, timestamp=1510126016682, value=19
2 column=info:id, timestamp=1510126016682, value=2
2 column=info:name, timestamp=1510126016682, value=xiaowang
2 row(s) in 0.0420 seconds
K)、映射Hbase已经存在的表
Hive > create external table hive_hbase_xiaoxu(row_key string,id bigint,name string,age int)
STORED BY "org.apache.hadoop.hive.hbase.HBaseStorageHandler" WITH SERDEPROPERTIES
("hbase.columns.mapping" = ":key,info:id,info:name,info:age")
TBLPROPERTIES ("hbase.table.name"="hive_hbase_pro");
L)、查看Hive表中的结构
hive> desc hive_hbase_xiaoxu;
OK
row_key string from deserializer
id bigint from deserializer
name string from deserializer
age int from deserializer
Time taken: 0.357 seconds, Fetched: 4 row(s)
M)、查看数据
hive> select * from hive_hbase_xiaoxu;
OK
1 1 xiaozhang 18
2 2 xiaowang 19
N)、总结
使用hive-hbase-handler往hbase中插入数据是按照一条一条的的形式插入的,速度是比较慢的,如果数量级在百万千万级别机器比较好的情况下可以使用这种方式,执行的速度大概在每妙2-3W之间
O)、官网说明
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
P)、增量更新与增量更新的说明
增量更新,建立的是Hive外表;而全量覆盖建立的是Hive内部表;
增量更新,必须先创建HBase表;而全量覆盖不需要事先建立HBase表;
增量更新,是在原有的HBase表的基础上新增数据,不改变原有数据;而全量覆盖则会覆盖原有数据
Q)、Hive 读取Hbase数据的说明
- 优点
可以方便的使用SQL的形式查看Hbase中的数据,也可以利用MapReduce的优势针对HBase存储的大量内容进行离线的计算和分析
- 缺点
查询速度性能的损失,hive有这样的功能, 他支持通过类似sql语句的语法来操作hbase中的数据, 但是速度慢
1-2)、Bulkload方式导数据
优势:
1、BulkLoad 不会写 WAL,也不会产生 flush 以及 split
2、如果我们大量调用 PUT 接口插入数据,可能会导致大量的 GC 操作。如果没有对Hbase的表进行预分区,会导致单太机器的热点问题,
严重时甚至可能会对 HBase 节点的稳定性造成影响,采用 BulkLoad 无此顾虑。
- 过程中没有大量的接口调用消耗性能。
A)、按照需要的数据的字段把数据导出到HDFS中
hive> insert overwrite directory "/tmp/sp_addr_bulktable" row format delimited FIELDS terminated by '\t' select sa.ID,sa.PLACE_CODE,sa.PLACE_NAME from xiaoxu.sp_address sa;
Query ID = root_20170403234442_c34e1570-f478-4c8b-bacf-83485f94b567
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1491287068852_0001, Tracking URL = http://hadoop1:8088/proxy/application_1491287068852_0001/
Kill Command = /opt/hadoop-2.6.4/bin/hadoop job -kill job_1491287068852_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2017-04-03 23:45:28,478 Stage-1 map = 0%, reduce = 0%
2017-04-03 23:46:01,357 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.46 sec
MapReduce Total cumulative CPU time: 1 seconds 460 msec
Ended Job = job_1491287068852_0001
Stage-3 is selected by condition resolver.
Stage-2 is filtered out by condition resolver.
Stage-4 is filtered out by condition resolver.
Moving data to: hdfs://mycluster/tmp/sp_addr_bulktable/.hive-staging_hive_2017-04-03_23-44-42_657_7591325811272483144-1/-ext-10000
Moving data to: /tmp/sp_addr_bulktable
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Cumulative CPU: 1.46 sec HDFS Read: 250195 HDFS Write: 166508 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 460 msec
OK
Time taken: 81.082 seconds
B)、利用importtsv命令生成Hfile文件
[root@skycloud1 conf]# HADOOP_CLASSPATH=`hbase classpath` hadoop jar /opt/hbase-1.2.1/lib/hbase-server-1.2.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,sp_address:ID,sp_address:PLACE_CODE,sp_address:PLACE_NAME -Dimporttsv.bulk.output="/tmpbulkdata/sp_addr_data" sp_address_bulkload "/tmp/sp_addr_bulktable"
详细的执行过程可以查看:http://blog.csdn.net/xfg0218/article/details/69063014
资料请查看:http://hbase.apache.org/book.html#importtsv
C)、查看Hbase中的表
hbase(main):011:0> list
TABLE
sp_address_bulkload
2 row(s) in 0.1430 seconds
=> ["sp_address_bulkload",]
D)、把Hfile文件导入到Hbase中
方式一:
[root@skycloud1 conf]# HADOOP_CLASSPATH=`hbase classpath` hadoop jar /opt/hbase-1.2.1/lib/hbase-server-1.2.1.jar completebulkload "/tmpbulkdata/sp_addr_data" sp_address_bulkload
详细的执行过程可以查看:http://blog.csdn.net/xfg0218/article/details/69063137
方式二:
[root@skycloud1 conf]# export HADOOP_CLASSPATH=`hbase classpath`
[root@skycloud1 conf]# yarn jar /opt/hbase-1.2.1/lib/hbase-server-1.2.1.jar completebulkload completebulkload "/tmpbulkdata/sp_addr_data" sp_address_bulkload
E)、总结
在这几种导数据的速度上这种方式是最快的,原理是按照Hfile进行的,一次性处理多条数据,建议使用这种方式。本次测试由于是自己的虚拟机所以会比较慢,在真是环境中会相当快的快,我们测试的是4亿多条的数据,20分钟搞定。
官网介绍:https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad
1-3)、Phoneix导入Hbase数据
Phoenix 官网 :https://phoenix.apache.org/pig_integration.html
在下载时注意版本的问题。
A)、安装Phoneix
[root@hadoop1 bin]# chmod a+x apache-phoenix-4.8.2-HBase-1.2-bin.tar.gz
[root@hadoop1 bin]# tar -zxvf apache-phoenix-4.8.2-HBase-1.2-bin.tar.gz
[root@hadoop1 bin]# mv apache-phoenix-4.9.0-HBase-1.1-bin phoenix-4.8.2-HBase-1.2
[root@hadoop1 bin]# cd phoenix-4.8.2-HBase-1.2/
移动phoenix的以下的JAR到Hbase集群中
[root@hadoop1 bin]# cp phoenix-core-4.8.2-HBase-1.2.jar phoenix-4.8.2-HBase-1.2-server.jar /opt/hbase-1.2.1/lib/
复制到其他的机器中:
[root@hadoop1 bin]# scp -r phoenix-core-4.8.2-HBase-1.2.jar phoenix-4.8.2-HBase-1.2-server.jar hadoop2:/opt/hbase-1.2.1/lib/
[root@hadoop1 bin]# scp -r phoenix-core-4.8.2-HBase-1.2.jar phoenix-4.8.2-HBase-1.2-server.jar hadoop3:/opt/hbase-1.2.1/lib/
复制Hbase的hbase-site.xml和Hadoop的core-site.xml和hdfs-site.xml到phoenix的bin目录下:
[root@hadoop1 bin]# cd /opt/hbase-1.2.1/conf/
[root@hadoop1 bin]# cp hbase-site.xml /opt/phoenix-4.8.2-HBase-1.2/bin/
[root@hadoop1 bin]# cd /opt/hadoop-2.6.4/etc/hadoop/
[root@hadoop1 bin]# cp core-site.xml hdfs-site.xml /opt/phoenix-4.8.2-HBase-1.2/bin/
B)、启动Phoneix
[root@hadoop1 bin]# cd /opt/phoenix-4.8.2-HBase-1.2/
[root@hadoop1 bin]# chmod 777 psql.py sqlline.py
重启Hbase集群使配置文件生效
[root@hadoop1 bin]# ./sqlline.py
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/phoenix-4.9.0-HBase-1.1/phoenix-4.9.0-HBase-1.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.6.4/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
17/04/07 00:25:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 4.8.2)
Driver: PhoenixEmbeddedDriver (version 4.9)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
87/87 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:>
也可以制定端口运行:
[root@hadoop1 bin]# sqlline.py hadoop1:2181
Hadoop1:主机的名字
2181:当前执行的端口
C)、修改超时时间
[root@hadoop1 bin]# cd /opt/phoenix-4.9.0-HBase-1.1/bin
[root@hadoop1 bin]# vi hbase-site.xml
追加一下配置文件
<!-- 修改超时时间 -->
<property>
<name>phoenix.query.timeoutMs</name>
<value>3600000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>3600000</value>
</property>
D)、Phoneix常用命令
0: jdbc:phoenix:> !tables
可以看出有比较好的列的名字与分割线
0: jdbc:phoenix:> !describe "STATS"
0: jdbc:phoenix:> DROP TABLE “STATS”
Phoneix支持常用的SQL语句,不过在查询时使用””与不适用””的区别。
INTEGER 整形
UNSIGNED_INT 无符号整形
BIGINT 长整形
UNSIGNED_LONG 无符号长整形
TINYINT 短整形
UNSIGNED_TINYINT 无符号短整型
SMALLINT 小整形
UNSIGNED_SMALLINT 无符号短整型
FLOAT 浮点型
UNSIGNED_FLOAT 无符号浮点型
DOUBLE 双精度浮点型
UNSIGNED_DOUBLE 无符号双精度浮点型
DECIMAL 长精度双精度浮点型
BOOLEAN 布尔类型
TIME 时间类型
DATE 日期类型
TIMESTAMP 时间戳类型
UNSIGNED_TIME 无符号时间类型
UNSIGNED_DATE 无符号日期类型
UNSIGNED_TIMESTAMP 无符号时间戳类型
VARCHAR 字符串类型
CHAR 字符类型
BINARY 二进制类型
VARBINARY 可变长二进制类型
ARRAY 数组类型
A)、聚合函数
AVG:求平均,如果没有返回NULL
SUM:求和函数
COUNT:求行数,如果指定某列,则返回该列非空个数,如果为*或1,则返回所有行,加上distinct则返回不相同的行数
MAX:求最大值
MIN:求最小值
PERCENTILE_CONT:指定
PERCENTILE_DISC:指定占比的列具体值是多少
PERCENT_RANK:指定值占的百分比,PERCENT_RANK( 39 ) WITHINGROUP (ORDER BY id ASC)
STDDEV_SAMP:样本标准差
STDDEV_POP:总体标准差
B)、支持的字符串函数
SUBSTR:取子串,默认是基于1的,如果想基于0,则指定0,如果指定为负数,则是从字符串结尾算起
TRIM:去除字符串头尾空格
LTRIM:去除字符串左侧空格
RTRIM:去除字符串右侧空格
LENGTH:返回字符串长度
REGEXP_SUBSTR:通过指定正则表达式获取子串
REGEXP_REPLACE:正则替换
UPPER:大写转换
LOWER:小写转换
REVERSE:字符串反转
TO_CHAR:将日期、时间、时间戳或数字格式化为一个字符串。默认日期格式为yyyy-MM-dd HH:mm:ss,数字格式为#,##0.###。
C)、支持的时间、日期函数
ROUND:四舍五入
TRUNC:截断
TO_DATE:转换为date类型
CURRENT_DATE:返回RS上当前日期
CURRENT_TIME:返回RS上当前时间
D)、支持的时间、日期函数
TO_NUMBER:转换日期、时间、时间戳为一个数字,可接受格式化串
COALESCE:指定默认值,如果相应值为null
E)、往Hbase中导入数据
1-1)、数据从hive中导出成phoenix支持的csv格式
hive> insert overwrite directory '/tmp/sp_address' row format delimited FIELDS TERMINATED BY ',' select * from sp_address;
[root@hadoop1 bin]# hadoop fs -du -h -s /tmp/sp_address
234.0 K /tmp/sp_address
1-3)、在phoenix中创建表
创建表在Hbase,必须制定主键
0: jdbc:phoenix:> create table sp_address(id integer primary key,place_type varchar,place_code varchar,place_name varchar,up_place_code varchar);
No rows affected (3.185 seconds)
[root@hadoop1 phoenix-4.9.0-HBase-1.1]# HADOOP_CLASSPATH=/opt/hbase-1.2.1/lib/hbase-protocol-1.2.1.jar:/etc/hbase/conf/ hadoop jar /opt/phoenix-4.8.2-HBase-1.2/phoenix-4.8.2-HBase-1.2-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table SP_ADDRESS --input /tmp/sp_address/*
***********************************
详细的运行日志请查看:http://blog.csdn.net/xfg0218/article/details/69669632
1-4)、Hbase导出数据
[root@hadoop1 ~]# vi exportHbase.sh
# get current path
# get current path
SCRIPT_DIR=`cd $(dirname $0) && pwd`
# export data
echo "scan 'portrayal',{LIMIT=>10}" | hbase shell > $SCRIPT_DIR/ExportHbase.txt
查看数据
[root@hadoop1 ~]# tail -n 5 ExportHbase.txt
00000075d9d93dc17e163d5c6dd335f8 column=tag:es_jcsx_rksx_xm, timestamp=1491033321055, value=***
00000075d9d93dc17e163d5c6dd335f8 column=tag:es_jcsx_rksx_xz, timestamp=1491033321055, value=\xE7\x8B\xAE\xE5\xAD\x90
00000075d9d93dc17e163d5c6dd335f8 column=tag:es_jcsx_rksx_zjlx, timestamp=1491033321055, value=1
10 row(s) in 0.4260 seconds
B)、Import/Export导出到序列化的文件
hbase(main):008:0> create 'xiaoxu','cf'
0 row(s) in 4.2890 seconds
=> Hbase::Table - xiaoxu
hbase(main):009:0> put 'xiaoxu','001','cf:name','xiaozhang'
0 row(s) in 0.1870 seconds
hbase(main):010:0> put 'xiaoxu','001','cf:age','18'
0 row(s) in 0.0340 seconds
hbase(main):011:0> scan 'xiaoxu'
ROW COLUMN+CELL
001 column=cf:age, timestamp=1491364070907, value=18
001 column=cf:name, timestamp=1491364050527, value=xiaozhang
1 row(s) in 0.0970 seconds
hbase(main):012:0>
[root@hadoop1 ~]#hbase org.apache.hadoop.hbase.mapreduce.Export xiaoxu /xiaoxu/test-output-001
***************************
详细的执行过程请查看:http://blog.csdn.net/xfg0218/article/details/69231258
[[email protected] ~/xiaoxu]$ hadoop fs -cat test-output-001/part-m-00000
SEQ1org.apache.hadoop.hbase.io.ImmutableBytesWritable%org.apache.hadoop.hbase.client.ResultР5ƀ¹z,N001F
001cfage ᱫ(218
#
001cfname ࠬ桳+(2 xiaozhang
因为是序列化的数据所以会乱码
hbase(main):014:0> truncate 'xiaoxu'
Truncating 'xiaoxu' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 3.6910 seconds
hbase(main):015:0> scan 'xiaoxu'
ROW COLUMN+CELL
0 row(s) in 0.3180 seconds
[root@hadoop1 ~]#hbase org.apache.hadoop.hbase.mapreduce.Import xiaoxu /xiaoxu/test-output-001
***************************
详细的导入的过程请查看:http://blog.csdn.net/xfg0218/article/details/69231415
hbase(main):016:0> scan 'xiaoxu'
ROW COLUMN+CELL
001 column=cf:age, timestamp=1491364070907, value=18
001 column=cf:name, timestamp=1491364050527, value=xiaozhang
1 row(s) in 0.0110 seconds
C)、利用pig从HBase中导出csv格式文件
官网:http://mirror.bit.edu.cn/apache/pig/
或者下载: 链接:http://pan.baidu.com/s/1bpmu0px 密码:gw05
[root@hadoop1 opt]# chmod a+x pig-0.16.0-src.tar.gz
[root@hadoop1 opt]# tar -zxvf pig-0.16.0-src.tar.gz
[root@hadoop1 opt]# vi /etc/profile
export PIG_HOME=/opt/pig-0.16.0
export PIG_CLASSPATH=$HADOOP_HOME
export PATH=$PATH:$PIG_HOME/bin:$PIG_CLASSPATH
[root@hadoop1 opt]# pig -help
Cannot locate pig-core-h2.jar. do 'ant -Dhadoopversion=23 jar', and try again
[root@hadoop1 opt]# vi ExportHbase.pig
REGISTER /opt/hbase-1.2.1/lib/htrace-core-3.1.0-incubating.jar;
REGISTER /opt/pig-0.16.0/lib/piggybank.jar;
x=LOAD 'hbase://sp_address_src' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('
sp_address:place_code,
sp_address:place_type
','-loadKey true');
STORE x INTO 'sp_address.csv' USING PigStorage(',');
-loadKey true' : 是显示主键,以,为分割并把文件导出到HDFS的sp_address.csv中
[root@hadoop1 opt]# pig -x mapreduce ExportHbase.pig
D)、使用Phoenix 导出csv文件
脚本一:table查询方式
[root@hadoop1 testSh]# vi example1.pig
REGISTER /opt/phoenix-4.8.2-HBase-1.2/phoenix-4.8.2-HBase-1.2-client.jar;
rows = load 'hbase://table/sp_address_orc' USING org.apache.phoenix.pig.PhoenixHBaseLoader('hadoop1:2181,hadoop2:2181,hadoop3:2181');
STORE rows INTO 'sp_address_orc.csv' USING PigStorage(',');
脚本二:query查询方式
[root@hadoop1 testSh]# vi example2.pig
REGISTER /opt/phoenix-4.8.2-HBase-1.2/phoenix-4.8.2-HBase-1.2-client.jar;
rows = load 'hbase://query/SELECT * FROM SP_ADDRESS' USING org.apache.phoenix.pig.PhoenixHBaseLoader('hadoop1,hadoop2,hadoop3:2181');
STORE rows INTO 'SP_ADDRESS.csv' USING PigStorage(',');
运行脚本:
[root@hadoop1 testSh]# pig -x local example1.pig
详细运行日志请查看:http://blog.csdn.net/xfg0218/article/details/69675774