Hbase的使用
一、zookeeper
1、 zookeeper的安装
(1) 上传并解压;
(2) 修改zookeeper/conf/zoo.cnf,查看或修改dataDir=/tmp/zookeeper的目录(数据目录);
(3) 在zoo.cnf中修改:
clientPort=2181
dataDir=/usr/zookeeper-3.5.3-beta/datadir
syncLimit=5
tickTime=2000
initLimit=10
(4) 在最后面创建server.1=192.168.80.12:2888:3888
其中具体格式为:
server(固定).第4步你设置的myid内容=连接ip:zookeeper内部联系端口号: zookeeper内部选举端口号.
(5) 创建dataDir=/tmp/zookeeper(mkdir -p / tmp/zookeeper)或其他目录;
(6) 在dataDir=/tmp/zookeeper目录下创建myid的文件,文件内容为不同的数字;
2、 Zookeeper的启动
(1)进入zookeeper/bin目录下,运行./zkServer.sh,启动服务(一点要每台电脑分别启动zookeeper服务);
(2) 运行./zkServer.sh status,查看zookeeper运行状态;
(3)运行./zkCli.sh start,启动客户端;
(4)可以执行help查看命令提示;
(5)常用命令:create set get delete ls quit
其中ls/get-w代表设置监听器,即改变时会发生提醒
create -s代表创建带序号的节点
-e代表创建临时的节点(客户端下线会删除该节点)
-p代表创建稳定持久的节点
3、 zookeeper的java连接
ZooKeeper zooKeeper= new ZooKeeper("192.168.80.12",2000,null);
可以用zooKeeper的api(creat,setData,getData,getChildren)来进行操作
注:create的ACL(权限------通常使用Ids.OPEN_ACL_UNSAFE)有:
还有,监听者:
new Watcher() { public void process(WatchedEvent watchedEvent) { //EventType.NodeChildrenChanged代表监听类型,即监听什么动作发生(还有set,get之类) //KeeperState.SyncConnected代表当前客户端连接状态 if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged && watchedEvent.getState() == Event.KeeperState.SyncConnected) { try { getNode(); } catch (Exception e) { e.printStackTrace(); } } } }
二、hbase
1、 安装
(1)上传及解压
(2)修改hbas/conf/hbase-env.sh
修改JAVA_HOME以及HBASE_MANAGER_ZK的值
(3) 修改hbas/conf/hbase-site.cfg
hbase.tmp.dir · 本地文件系统tmp目录,一般配置成local模式的设置一下,但是最好还是需要设置一下,因为很多文件都会默认设置成它下面的 · 线上配置
· 默认值:
· 写到系统的/tmp目录 hbase.rootdir · HBase集群中所有RegionServer共享目录,用来持久化HBase的数据,一般设置的是hdfs的文件目录,如hdfs://namenode.example.org:9000/hbase · 线上配置
· 默认值:
hbase.cluster.distributed · 集群的模式,分布式还是单机模式,如果设置成false的话,HBase进程和Zookeeper进程在同一个JVM进程。 · 线上配置为true · 默认值:false hbase.zookeeper.quorum · zookeeper集群的URL配置,多个host中间用逗号(,)分割 · 线上配置
· 默认值:localhost |
(4) 修改hbas/conf/regionservers
写入集群的主机名或者IP地址
(6) 进入hbase/bin下,运行./start-hbase.sh开始运行集群服务(进入ip:16010可以查询表格)
(7) 运行./hbase shell进入到shell命令界面
2、 hbase的使用
(1)创建名称空间:create_namespace ‘名称空间’ (查看/删除用list/drop_namespace)
(2)创建表格命令:create ‘名称空间:表名’,‘列族名1’’,’列族名2’(删除用truncate)
(3)查看表格命令:list
(4)输入数据命令:put ‘表名’,‘主列名’,‘列族名:属性’,‘值’
(5)查看数据命令:get ‘表名’,’主列名’,’列族名:属性’,
(6)可以用help查看命令帮助
3、 hbase的java编程
(1) 增删改查范例:
· public classMyHbase {
Connection connection;
/**
* 获取连接
* @throws IOException
*/
@Before
publicvoid createConnection()throws IOException{
Configurationconfiguration= HBaseConfiguration.create();
//必须的参数设置,用于设定连接到那台机器
//同时必须在运行这个程序的机器的hosts文件中加入主机名,不然失败!!!!!!
configuration.set("hbase.zookeeper.quorum","192.168.80.12,192.168.80.13");
connection=ConnectionFactory.createConnection(configuration);
}
/**
* 建表、设置表结构的方法
* @throws IOException
*/
publicvoid createTables()throws IOException{
Admin admin= connection.getAdmin();
ColumnFamilyDescriptorbuild= ColumnFamilyDescriptorBuilder
.newBuilder("name".getBytes())
.setBlocksize(131072)
.setMinVersions(1)
.setMaxVersions(3)
.build();
TableDescriptormy_table= TableDescriptorBuilder
.newBuilder(TableName.valueOf("my_table"))
.addColumnFamily(build)
.build();
admin.createTable(my_table);
admin.close();
connection.close();
}
/**
* 创建namespace方法
* @throws IOException
*/
publicvoid createNameSpace()throws IOException{
Admin admin= connection.getAdmin();
NamespaceDescriptornamespaceDescriptor=NamespaceDescriptor.create("123123").build();
admin.createNamespace(namespaceDescriptor);
admin.close();
connection.close();
}
/**
* 写入数据
* @throws IOException
*/
publicvoid putData()throws IOException{
Table my_table= connection.getTable(TableName.valueOf("my_table"));
Put put=newPut("zk001".getBytes()); //行键
put.addColumn("baseInfo".getBytes(),"age".getBytes(),Bytes.toBytes(24));
my_table.put(put);
my_table.close();
connection.close();
}
/**
* 查询数据
* @throws IOException
*/
publicvoid getDate()throws IOException{
Table my_table= connection.getTable(TableName.valueOf("my_table"));
/*或者这种,比较麻烦
Get get=newGet("zk1".getBytes());
Result result =my_table.get(get);
CellScanner cellScanner =result.cellScanner();
while (cellScanner.advance()){
Cell current =cellScanner.current();
byte[] rowArray =current.getRowArray();
byte[] familyArray =current.getFamilyArray();
byte[] qualifierArray =current.getQualifierArray();
System.out.println(Bytes.toString(rowArray,current.getRowOffset(),current.getRowLength()));
System.out.println(Bytes.toString(familyArray,current.getFamilyOffset(),current.getFamilyLength()));
System.out.println(Bytes.toString(qualifierArray,current.getQualifierOffset(),current.getQualifierLength()));
}*/
Scan scan=newScan();
scan.withStartRow("zk001".getBytes()).withStopRow("zk005".getBytes(),true);
ResultScannerscanner= my_table.getScanner(scan);
//ResultScannerscanner = my_table.getScanner("name".getBytes()); 遍历列族名为name的全部数据
Iterator<Result>iterator = scanner.iterator();
while (iterator.hasNext()){
Result next= iterator.next();
byte[]row = next.getRow();
byte[]value = next.value();
System.out.print(Bytes.toString(row));
System.out.println(Bytes.toString(value));
}
}
}
(2)批量导入到hbase中
1)main方法
public static void main(String[] args)throws InterruptedException,IOException, ClassNotFoundException {
Configuration configuration1= HBaseConfiguration.create();
configuration1.set("","192.168.80.12");
Job instance= Job.getInstance(configuration);
job.setJarByClass(MyHadoop.class);
job.setMapperClass(MySecondMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.setInputPaths(instance,newPath("hdfs://master:9000/INFO/Input"));
FileOutputFormat.setOutputPath(instance,newPath("hdfs://master:9000/INFO/Output"));
Connection connection= ConnectionFactory.createConnection(configuration1);
Table my_table= connection.getTable(TableName.valueOf("my_table"));
HFileOutputFormat2.configureIncrementalLoad(instance,my_table,connection.
getRegionLocator(TableName.valueOf("my_table")));
boolean b1= instance.waitForCompletion(true);
//调用bulkload批量导入
LoadIncrementalHFilesload= new LoadIncrementalHFiles(configuration1);
load.doBulkLoad(newPath("hdfs://master:9000/OutPut"),connection.getAdmin(),
my_table,connection.getRegionLocator(TableName.valueOf("my_table")));
//退出
System.exit(b1?0:1);
}
2)map方法
protected void map(Textrowkey, IntWritable value,Context context)throws IOException,InterruptedException {
ImmutableBytesWritableimmutableBytesWritable=newImmutableBytesWritable("rowkey".getBytes());
byte[] families = Bytes.toBytes("family"); //表中建好的列族名
byte[] keys = Bytes.toBytes("count"); //key
byte[]values = Bytes.toBytes(value.get()); //value
Put put= new Put("rowkey".getBytes());
put.addColumn(families,keys,values);
//put.addImmutable(families,keys,values);与上面是同样效果
context.write(immutableBytesWritable,put);
}