Zookeeper
zookeeper介绍
ZooKeeper 是一个为分布式应用所设计的分布的、开源的协调服务。分布式的应用可以建立在同步配置管理、选举、分布式锁、分组和命名等服务的更高级别的实现的基础之上。 ZooKeeper 意欲设计一个易于编程的环境,它的文件系统使用我们所熟悉的目录树结构。
ZooKeeper的节点是通过像树样的结构来维护的,并且每个节点通过路径来标识访问。除此之外,每个节点还拥有些信息包括:数据、数据长度、创建时间、修改时间等等。从这样既含有数据,又作为路径表标的节点的特点中,可以看出, ZooKeeper的节点既可以被看做是个文件件,又可以被看做是个目录,它同时具有两者的特点。通常将zookeeper的节点称为znode。
znode分类
持久节点
(PERSISTENT)持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。持久顺序节点
(PERSISTENT_SEQUENTIAL)类节点的基本特性和持久节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。临时节点
(EPHEMERAL)与持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。临时顺序节点
(EPHEMERAL_SEQUENTIAL)类节点的基本特性和临时节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。节点监测
:客户端可以监测znode节点的变化。Zonode节点的变化触发相应的事件,然后清除对该节点的监测。当监测一个znode节点时候,Zookeeper会发送通知给监测节点。
Zookeeper使用场景
安装zookeeper
- 安装JDK 配置环境变量
[root@centos ~]# rpm -ivh jdk-8u171-linux-x64.rpm
Preparing... ########################################### [100%]
1:jdk1.8 ########################################### [100%]
Unpacking JAR files...
tools.jar...
plugin.jar...
javaws.jar...
deploy.jar...
rt.jar...
jsse.jar...
charsets.jar...
localedata.jar...
[root@centos ~]# vi /root/.bashrc
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
[root@centos ~]# source /root/.bashrc
[root@centos ~]# jps
37928 Jps
- 关闭防火墙
[root@centos ~]# service iptables stop
iptables: Setting chains to policy ACCEPT: filter [ OK ]
iptables: Flushing firewall rules: [ OK ]
iptables: Unloading modules: [ OK ]
[root@centos ~]# chkconfig iptables off
[root@centos ~]# chkconfig --list | grep iptables
iptables 0:off 1:off 2:off 3:off 4:off 5:off 6:off
- 配置主机名和ip映射关系
[root@centos ~]# ifconfig
eth0 Link encap:Ethernet HWaddr 00:0C:29:6F:D0:0B
inet addr:`192.168.128.128` Bcast:192.168.128.255 Mask:255.255.255.0
inet6 addr: fe80::20c:29ff:fe6f:d00b/64 Scope:Link
UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
RX packets:314142 errors:0 dropped:0 overruns:0 frame:0
TX packets:41722 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:1000
RX bytes:422136402 (402.5 MiB) TX bytes:5280457 (5.0 MiB)
lo Link encap:Local Loopback
inet addr:127.0.0.1 Mask:255.0.0.0
inet6 addr: ::1/128 Scope:Host
UP LOOPBACK RUNNING MTU:16436 Metric:1
RX packets:2049524 errors:0 dropped:0 overruns:0 frame:0
TX packets:2049524 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:0
RX bytes:1717925027 (1.5 GiB) TX bytes:1717925027 (1.5 GiB
[root@`centos` ~]# vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.128.128 centos
[root@centos ~]# cat /etc/sysconfig/network 主机名配置(重启centos)
NETWORKING=yes
HOSTNAME=centos
- 配置安装zookeeper&启动&关闭
[root@centos ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[root@centos ~]# cd /usr/zookeeper-3.4.6/
[root@centos zookeeper-3.4.6]# cp conf/zoo_sample.cfg conf/zoo.cfg
[root@centos zookeeper-3.4.6]# vi conf/zoo.cfg
tickTime=2000 #监测会话超时2*tickTime
dataDir=/root/zkdata #zookeeper数据目录
clientPort=2181 #服务端口
[root@centos ~]# mkdir /root/zkdata
[root@centos zookeeper-3.4.6]# ./bin/zkServer.sh start zoo.cfg #启动
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@centos zookeeper-3.4.6]# ./bin/zkServer.sh stop zoo.cfg #关闭
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
[root@centos zookeeper-3.4.6]# ./bin/zkServer.sh status zoo.cfg #查看状态
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
[root@centos zookeeper-3.4.6]# jps
38605 QuorumPeerMain
zookeeper的节点操作
[root@centos zookeeper-3.4.6]# ./bin/zkCli.sh -server centos:2181
[zk: centos:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
ls2 path [watch]
printwatches on|off
delete path [version]
rmr path
get path [watch]
create [-s][-e] path data acl
quit
close
connect host:port
Java 操作Zookeeper(zookeeper、zkclient、curator)
- node 基本操作
- 节点监测
zkClient连接zookeeper
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
private ZkClient client=null;
@Before
public void before(){
client=new ZkClient("192.168.128.128:2181");
assertNotNull(client);
}
@Test
public void testCreate(){
String path = client.create("/zpark", "zhangsan", CreateMode.PERSISTENT);
assertEquals("/zpark",path);
}
@Test
public void testDelete(){
boolean flag = client.delete("/zpark");
assertTrue(flag);
}
@Test
public void testUpdate(){
client.writeData("/zpark","哈哈哈");
}
@Test
public void testRead(){
Object o = client.readData("/zpark");
assertNotNull(o);
}
@Test
public void testCreateMany(){
client.createPersistent("/rpc/com.baizhi.service.IUserService/providers/192.168.128.128:9998",true);
}
@Test
public void testGetChild(){
List<String> children = client.getChildren("/rpc/com.baizhi.service.IUserService/providers");
for (String child : children) {
System.out.println(child);
}
}
@Test
public void testExists(){
boolean exists = client.exists("/zpark");
System.out.println(exists);
}
@Test
public void testChildChangeListener() throws InterruptedException {
client.subscribeChildChanges("/rpc/com.baizhi.service.IUserService/providers", new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> children) throws Exception {
System.out.println(path);
for (String child : children) {
System.out.println(child);
}
}
});
Thread.sleep(10000);
}
@Test
public void testNodeDataChange() throws InterruptedException {
client.subscribeDataChanges("/zpark", new IZkDataListener() {
@Override
public void handleDataChange(String path, Object o) throws Exception {
System.out.println("数据改变:"+o);
}
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("节点被删除:"+path);
}
});
Thread.sleep(10000000);
}
@After
public void after(){
client.close();
}
curator framework
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
private CuratorFramework client;
@Before
public void before(){
RetryPolicy retryPolicy = new RetryNTimes(3,1000);
//new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient("192.168.128.128:2181", retryPolicy);
client.start();
}
@Test
public void testCreate() throws Exception {
String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/zpark");
System.out.println(path);
}
@Test
public void testDelete() throws Exception {
client.delete().forPath("/zpark");
}
@Test
public void testUpdate() throws Exception {
client.setData().forPath("/zpark","lisi".getBytes());
}
@Test
public void testGetData() throws Exception {
byte[] bytes = client.getData().forPath("/zpark");
System.out.println(new String(bytes));
}
@Test
public void testCreateMany() throws Exception {
String path = client.create().creatingParentsIfNeeded().forPath("/rpc/com.baizhi.service.IUserService/providers/node2");
System.out.println(path);
}
@Test
public void testGetMany() throws Exception {
List<String> children = client.getChildren().forPath("/rpc/com.baizhi.service.IUserService/providers");
for (String child : children) {
System.out.println(child);
}
}
@Test
public void testNodeDataChange() throws Exception {
final NodeCache nodeCache=new NodeCache(client,"/zpark");
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
Thread.sleep(1000000);
nodeCache.close();
}
@Test
public void testNodeChildChange() throws Exception {
PathChildrenCache cache = new PathChildrenCache(client,"/zpark",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type type = event.getType();
switch (type){
case CHILD_ADDED:{
ChildData data = event.getData();
System.out.println("add:" + event.getData());
break;
}case CHILD_UPDATED:
System.out.println("update:" + event.getData());
break;
case CHILD_REMOVED:
System.out.println("remove:" + event.getData());
break;
default:
break;
}
}
});
Thread.sleep(1000000);
cache.close();
}
@Test
public void testDistributed() throws Exception {
InterProcessMutex lock = new InterProcessMutex(client, "/rpc/com.baizhi.service.IUserService#update");
if (lock.acquire(1, TimeUnit.DAYS)) {
try {
// do some work inside of the critical section here
System.out.println("------------");
}
finally {
lock.release();
}
}
}
@After
public void after(){
client.close();
}
分布式锁
参考:https://blog.csdn.net/kiss_the_sun/article/details/50221463
https://www.jianshu.com/p/6618471f6e75