目录
zookeeper的功能和应用场景图示:
zookeeper集群结构示意图:
zookeeper案例示意图:
Hadoop之Zookeeper简介及安装
Zookeeper简介
Zookeeper: 是一个分布式的、开源的程序协调服务,是 hadoop 项目下的一个子项目,一个基础组件。
主要提供2个服务:
1)为客户端管理少量的数据kv(采用树状目录结构->为了保证高吞吐低延迟[每个节点存放数据最大容量1M])
2)为客户端监听指定数据节点的状态,并在数据节点发生变化时,通知客户端.
zookeeper的数据存储机制
数据存储形式
zookeeper中对用户的数据采用kv形式存储
只是zk有点特别:
key:是以路径的形式表示的,那就以为着,各key之间有父子关系,比如
/ 是顶层key
用户建的key只能在/ 下作为子节点,比如建一个key:/aa 这个key可以带value数据
也可以建一个key:/bb
也可以建key:/aa/xx
zookeeper中,对每一个数据key,称作一个znode
最终形成的时一个树状的目录结构。
znode类型
zookeeper中的znode有多种类型:
1、PERSISTENT 持久的:创建者就算跟集群断开联系,该类节点也会持久存在与zk集群中
2、EPHEMERAL 短暂的:创建者一旦跟集群断开联系,zk就会将这个节点删除
3、SEQUENTIAL 带序号的:这类节点,zk会自动拼接上一个序号,而且序号是递增的
组合类型:
PERSISTENT :持久不带序号
EPHEMERAL :短暂不带序号
PERSISTENT 且 SEQUENTIAL :持久且带序号
EPHEMERAL 且 SEQUENTIAL :短暂且带序号
zookeeper的集群部署
安装包下载:https://zookeeper.apache.org/releases.html
1、上传安装包到集群服务器
2、解压
3、修改配置文件
进入zookeeper的安装目录的conf目录
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata
clientPort=2181
#autopurge.purgeInterval=1
server.1=hdp20-01:2888:3888
server.2=hdp20-02:2888:3888
server.3=hdp20-03:2888:3888
对3台节点,都创建目录 mkdir /root/zkdata
对3台节点,在工作目录中生成myid文件,但内容要分别为各自的id:1,2,3
hdp20-01上: echo 1 > /root/zkdata/myid
hdp20-02上: echo 2 > /root/zkdata/myid
hdp20-03上: echo 3 > /root/zkdata/myid
4、从hdp20-01上scp安装目录到其他两个节点
scp -r zookeeper-3.4.6/ hdp20-02$PWD
scp -r zookeeper-3.4.6/ hdp20-03:$PWD
5、启动zookeeper集群
zookeeper没有提供自动批量启动脚本,需要手动一台一台地起zookeeper进程
在每一台节点上,运行命令:
bin/zkServer.sh start
启动后,用jps应该能看到一个进程:QuorumPeerMain
但是,光有进程不代表zk已经正常服务,需要用命令检查状态:
bin/zkServer.sh status
能看到角色模式:为leader或follower,即正常了。
6、自建zookeeper集群批量启动脚本:
vi zkmanage.sh
#!/bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:${1}ing..."
ssh $host "source /etc/profile;/root/apps/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1"
done
sleep 2
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:status----"
ssh $host "source /etc/profile;/root/apps/apache-zookeeper-3.5.7-bin/bin/zkServer.sh status"
done
sh zkmanage.sh
or chmod +x zkmanage.sh ./zkmanage.sh
zookeeper的命令行客户端操作
数据管理功能:
创建节点:create /aaa 'ppppp'
查看节点下的子节点: ls /aaa
获取节点的value:get /aaa
修改节点的value:set /aaa 'mmmmm'
删除节点:rmr /aaa
数据监听功能
ls /aaa watch
## 查看/aaa的子节点的同时,注册了一个监听“节点的子节点变化事件”的监听器
get /aaa watch
## 获取/aaa的value的同时,注册了一个监听“节点value变化事件”的监听器
注意:注册的监听器在正常收到一次所监听的事件后,就失效
zookeeper客户端api示例:
// zookeeper客户端API
public class ZookeeperClientDemo {
ZooKeeper zk = null;
@Before
public void init() throws Exception{
// 构造一个连接zookeeper的客户端对象
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
}
@Test
public void testCreate() throws Exception{
// 参数1:要创建的节点路径 参数2:数据 参数3:访问权限 参数4:节点类型
String create = zk.create("/eclipse", "hello eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(create);
zk.close();
}
@Test
public void testUpdate() throws Exception {
// 参数1:节点路径 参数2:数据 参数3:所要修改的版本,-1代表任何版本
zk.setData("/eclipse", "我爱你".getBytes("UTF-8"), -1);
zk.close();
}
@Test
public void testGet() throws Exception {
// 参数1:节点路径 参数2:是否要监听 参数3:所要获取的数据的版本,null表示最新版本
byte[] data = zk.getData("/eclipse", false, null);
System.out.println(new String(data,"UTF-8"));
zk.close();
}
@Test
public void testListChildren() throws Exception {
// 参数1:节点路径 参数2:是否要监听
// 注意:返回的结果中只有子节点名字,不带全路径
List<String> children = zk.getChildren("/cc", false);
for (String child : children) {
System.out.println(child);
}
zk.close();
}
@Test
public void testRm() throws InterruptedException, KeeperException{
zk.delete("/eclipse", -1);
zk.close();
}
}
// 监听节点事件
public class ZookeeperWatchDemo {
ZooKeeper zk = null;
@Before
public void init() throws Exception {
// 构造一个连接zookeeper的客户端对象
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
System.out.println(event.getPath()); // 收到的事件所发生的节点路径
System.out.println(event.getType()); // 收到的事件的类型
System.out.println("赶紧换照片,换浴室里面的洗浴套装....."); // 收到事件后,我们的处理逻辑
try {
zk.getData("/mygirls", true, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
System.out.println("子节点变化了......");
}
}
});
}
@Test
public void testGetWatch() throws Exception {
byte[] data = zk.getData("/mygirls", true, null); // 监听节点数据变化
List<String> children = zk.getChildren("/mygirls", true); //监听节点的子节点变化事件
System.out.println(new String(data, "UTF-8"));
Thread.sleep(Long.MAX_VALUE);
}
}