Zookeeper 实战
主要内容
Zookeeper 简介
Zookeeper 存储结构
监听通知
安装 Zookeeper
Zookeeper 常用命令
使用 Java API 操作 Zookeeper
Zookeeper 实战
一、 Zookeeper 简介
1 什么是 Zookeeper
Zookeeper 官网:http://zookeeper.apache.org/
Zookeeper 是 Apache 的一个分布式服务框架,是 Apache Hadoop 的一个子项目。官方 文档上这么解释 Zookeeper,它主要是用来解决分布式应用中经常遇到的一些数据管理问题, 如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
简单来说 zookeeper=文件系统+监听通知机制。
二、 Zookeeper 存储结构
1 Znode
在 Zookeeper 中,znode 是一个跟 Unix 文件系统路径相似的节点,可以向节点存储 数据或者获取数据。
Zookeeper 底层是一套数据结构。这个存储结构是一个树形结构,其上的每一个节点, 我们称之为“znode”
Zookeeper 中的数据是按照“树”结构进行存储的。而且 znode 节点还分为 4 中不同 的类型。
每一个 znode 默认能够存储 1MB 的数据(对于记录状态性质的数据来说,够了)
可以使用 zkCli 命令,登录到 Zookeeper 上,并通过 ls、create、delete、get、set 等命令操作这些 znode 节点。
2 Znode 节点类型
2.1 PERSISTENT-持久化目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在。
2.2 PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行 顺序编号。
2.3 EPHEMERAL-临时目录节点
客户端与 zookeeper 断开连接后,该节点被删除。
2.4 EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺 序编号。
三、 监听通知机制
Zookeeper 是使用观察者设计模式
来设计的。当客户端注册监听它关心的目录节点时, 当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,Zookeeper 会通知客户端。
四、 安装 zookeeper
官方资源包可在 zookeeper.apache.com 站点中下载。这里安装版本为:3.6.0。
1 安装单机版
1.1 安装 Linux
1.2 安装 JDK
配置环境变量
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
1.3 上传 Zookeeper
1.4 解压 Zookeeper 压缩包
[root@localhost temp]# tar -zxf zookeeper-3.6.0.tar.gz
[root@localhost temp]# cp zookeeper-3.6.0 /usr/local/zookeeper -r
1.5 Zookeeper 目录结构
1. bin:放置运行脚本和工具脚本,
2. conf:zookeeper 默认读取配置的目录,里面会有默认的配置文件
3. docs:zookeeper 相关的文档
4. lib:zookeeper 核心的 jar
5. logs:zookeeper 日志
1.6配置 Zookeeper
Zookeeper 在启动时默认的去 conf 目录下查找一个名称为 zoo.cfg 的配置文件。
在 zookeeper 应用目录中有子目录 conf。其中有配置文件模板:zoo_sample.cfg
cp zoo_sample.cfg zoo.cfg。zookeeper 应用中的配置文件为 conf/zoo.cfg。
修改配置文件 zoo.cfg - 设置数据缓存路径
1.7启动 Zookeeper
默认加载配置文件:./zkServer.sh start:默认的会去 conf 目录下加载 zoo.cfg 配置文件。
指定加载配置文件:./zkServer.sh start 配置文件的路径
。
1.8停止 Zookeeper
./zkServer.sh stop
1.9查看 Zookeeper 状态
./zkServer.sh status
1.10使用客户端连接单机版 Zookeeper
1.10.1 连接方式一
bin/zkCli.sh
默认连接地址为本机地址,默认连接端口为 2181
Ctrl+C 退出连接
1.10.2 连接方式二
bin/zkCli.sh -server ip:port 连接指定 IP 地址与端口
2 安装集群版
2.1 Zookeeper 集群说明
2.1.1 Zookeeper 集群中的角色
Zookeeper 集群中的角色主要有以下三类
2.2集群安装
使用 3 个 Zookeeper 应用搭建一个伪集群。应用部署位置是:192.168.233.130。客户端 监听端口分别为:2181、2182、2183。投票选举端口分别为 2881/3881、2882/3882、2883/3883。
tar -zxf zookeeper-3.6.0.tar.gz
将解压后的 Zookeeper 应用目录重命名,便于管理
mv zookeeper-3.6.0 zookeeper01
2.2.1 提供数据缓存目录
在 zookeeper01 应用目录中,创建 data 目录,用于缓存应用运行数据
cd zookeeper01
mkdir data
2.2.2 复制应用
复制两份 Zookeeper 应用。用于模拟集群中的 3 个节点。
cp -r zookeeper01 zookeeper02
cp -r zookeeper01 zookeeper03
2.2.3 提供配置文件
在 zookeeper 应用目录中有子目录 conf。其中有配置文件模板:zoo_sample.cfg
cp zoo_sample.cfg zoo.cfg
zookeeper 应用中的配置文件为 conf/zoo.cfg
。
2.2.4 修改配置文件 zoo.cfg - 设置数据缓存路径
dataDir 参数值为应用运行缓存数据保存目录。
2.2.5 提供应用唯一标识
在 Zookeeper 集群中,每个节点需要一个唯一标识。这个唯一标识要求是自然数。且唯 一标识保存位置是:数据缓存目录(dataDir=/usr/local/zookeeper/data
)的myid
文件中。其中“数据缓存目录”为配置文件 zoo.cfg 中的配置参数
在 data 目录中创建文件myid : touch myid
为应用提供唯一标识。本环境中使用 1、2、3 作为每个节点的唯一标识。
vi myid
简化方式为: echo [唯一标识] >> myid
。 echo 命令为回声命令,系统会将命令发送的 数据返回。'>>'
为定位,代表系统回声数据指定发送到什么位置。 此命令代表系统回声数 据发送到 myid 文件中。 如果没有文件则创建文件。
2.2.6 修改配置文件 zoo.cfg - 设置监听客户端、投票、选举端口
vim zoo.cfg
clientPort=2181 #服务端口根据应用做对应修改,zk01-2181,zk02-2182,zk03-2183
server.1=192.168.233.130:2881:3881
server.2=192.168.233.130:2882:3882
server.3=192.168.233.130:2883:3883
2.3编写启动、关闭集群脚本
在 Linux 中可以使用 chmod 命令为文件授权。
chmod 777 文件名
777 表示为文件分配可读,可写,可执行权限。
2.3.1 启动 Zookeeper 集群脚本
zookeeper01/bin/zkServer.sh start
zookeeper02/bin/zkServer.sh start
zookeeper03/bin/zkServer.sh start
2.3.2 关闭 Zookeeper 集群脚本
zookeeper01/bin/zkServer.sh stop
zookeeper02/bin/zkServer.sh stop
zookeeper03/bin/zkServer.sh stop
2.4连接集群
可以使用任何节点中的客户端工具连接集群中的任何节点。
./zkCli.sh -server 192.168.233.130:2183
五、 Zookeeper 常用命令
1 ls 命令
ls /path
使用 ls 命令查看 zookeeper 中的内容。在 ZooKeeper 控制台客户端中,没有默认列表功 能,必须指定要列表资源的位置。 如:ls /
或者ls /path
2 create 命令
create [-e] [-s] /path [data]
使用 create 命令创建一个新的 Znode。create [-e] [-s] path data -
创建节点,如: create /test 123
创建一个/test
节点,节点携带数据信息 123。create -e /test 123
创建一个临时节 点/test
,携带数据为 123,临时节点只在当前会话生命周期中有效,会话结束节点自动删除。 create -s /test 123
创建一个顺序节点/test,携带数据 123,创建的顺序节点由 ZooKeeper 自 动为节点增加后缀信息,如-/test00000001
等。-e
和-s
参数可以联合使用。
3 get 命令
get [-s] /path
get 命令获取 Znode 中的数据。
get -s /path
-s
查看 Znode 详细信息
oldlu:存放的数据
cZxid:创建时 zxid(znode 每次改变时递增的事务 id)
ctime:创建时间戳
mZxid:最近一次更近的 zxid
mtime:最近一次更新的时间戳
pZxid:子节点的 zxid
cversion:子节点更新次数
dataversion:节点数据更新次数
aclVersion:节点 ACL(授权信息)的更新次数
ephemeralOwner:如果该节点为 ephemeral 节点(临时,生命周期与 session 一样),
ephemeralOwner 值表示与该节点绑定的 session id. 如果该节点不是
ephemeral 节点, ephemeralOwner 值为 0.
dataLength:节点数据字节数
numChildren:子节点数量
4 set 命令
set /path [data]
添加或修改 Znode 中的值
5 delete 命令
delete /path
删除 Znode。
六、 使用 Java API 操作 Zookeeper
1 创建 Znode
1.1创建项目
1.2修改 POM 文件添加依赖
该依赖为基于 Java 语言连接 Zookeeper 的客户端工具
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
1.3创建 Znode 并添加数据
/**
* 操作Zookeeper的Znode
*/
public class ZnodeDemo implements Watcher {
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
//创建连接Zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ZnodeDemo());
//创建一个Znode
String path = zooKeeper.create("/bjsxt/test","oldlu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);
}
/**
* 事件通知回调方法
* @param event
*/
@Override
public void process(WatchedEvent event) {
//获取连接事件
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接成功");
}
}
}
2 获取 Znode 中的数据
2.1获取指定节点中的数据
byte[] data= zooKeeper.getData("/bjsxt/test0000000001",new ZnodeDemo(),new Stat());
System.out.println(new String(data));
2.2获取所有子节点中的数据
List<String> list = zooKeeper.getChildren("/bjsxt",new ZnodeDemo());
for(String path :list){
byte[] data= zooKeeper.getData("/bjsxt/"+path,new ZnodeDemo(),null);
System.out.println(new String(data));
}
3 设置 Znode 中的值
//设置Znode中的值
// -1:表示匹配任何版本
Stat stat = zooKeeper.setData("/bjsxt/test0000000001","bjsxt".getBytes(),-1);
System.out.println(stat);
4 删除 Znode
/**
* 操作Zookeeper的Znode
*/
public class ZnodeDemo implements Watcher {
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
//创建连接Zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ZnodeDemo());
//创建一个Znode
/* String path = zooKeeper.create("/bjsxt/test","oldlu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);*/
//获取指定节点的数据
/*byte[] data= zooKeeper.getData("/bjsxt/test0000000001",new ZnodeDemo(),new Stat());
System.out.println(new String(data));*/
//获取指定节点中的所有子节点中的数据
/* List<String> list = zooKeeper.getChildren("/bjsxt",new ZnodeDemo());
for(String path :list){
byte[] data= zooKeeper.getData("/bjsxt/"+path,new ZnodeDemo(),null);
System.out.println(new String(data));
}*/
//设置Znode中的值
// -1:表示匹配任何版本
/* Stat stat = zooKeeper.setData("/bjsxt/test0000000001","bjsxt".getBytes(),-1);
System.out.println(stat);*/
zooKeeper.delete("/bjsxt/test0000000001",-1);
}
/**
* 事件通知回调方法
* @param event
*/
@Override
public void process(WatchedEvent event) {
//获取连接事件
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接成功");
}
}
}
七、 Zookeeper 实战
实战案例介绍:使用 Zookeeper 与 RMI 技术实现一个 RPC 框架。
RPC:RPC(Remote Procedure Call)远程过程调用。
1 基于 RMI 实现远程方法调用
1.1RMI 简 介
RMI(Remote Method Invocation) 远程方法调用。
RMI 是从 JDK1.2 推出的功能,它可以实现在一个 Java 应用中可以像调用本地方法一样 调用另一个服务器中 Java 应用(JVM)中的内容。
RMI 是 Java 语言的远程调用,无法实现跨语言。
1.2执行流程
Registry(注册表)是放置所有服务器对象的命名空间。 每次服务端创建一个对象时,它 都会使用 bind()
或 rebind()
方法注册该对象。 这些是使用称为绑定名称的唯一名称注册的。
要调用远程对象,客户端需要该对象的引用。即通过服务端绑定的名称从注册表中获取 对象(lookup()
方法)。
1.3RMI 的 API 介绍
1.3.1 Remote 接口
java.rmi.Remote
定义了此接口为远程调用接口。如果接口被外部调用,需要继承此接 口。
1.3.2 RemoteException 类
java.rmi.RemoteException
继承了 Remote 接口的接口,如果方法是允许被远程调用的,需要抛出此异常。
1.3.3 UnicastRemoteObject 类
java.rmi.server.UnicastRemoteObject
此类实现了 Remote 接口和 Serializable 接口。
自定义接口实现类除了实现自定义接口还需要继承此类。
1.3.4 LocateRegistry 类
java.rmi.registry.LocateRegistry
可以通过 LocateRegistry 在本机上创建 Registry,通过特定的端口就可以访问这个 Registry。
1.3.5 Naming 类
java.rmi.Naming Naming
定义了发布内容可访问 RMI 名称。也是通过 Naming 获取到指定的远程方法。
1.4创建 Server 端
1.4.1 创建项目
1.4.2 创建接口
/**
* 定义允许远程调用接口,该接口必须要实现Remote接口
* 允许被远程调用的方法必须要抛出RemoteException
*/
public interface DemoService extends Remote {
String demo(String str) throws RemoteException;
}
1.4.3 创建接口实现类
/**
* 接口实现类必须要继承UnicastRemoteObject。
* 会自动添加构造方法,需要修改为public
*/
public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
public DemoServiceImpl() throws RemoteException {
}
@Override
public String demo(String str) throws RemoteException {
return "Hello RMI "+str;
}
}
1.4.4 编写主方法
public class DemoServer {
public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException {
//将对象实例化
DemoService demoService = new DemoServiceImpl();
//创建本地注册表
LocateRegistry.createRegistry(8888);
//将对象绑定到注册表中
Naming.bind("rmi://localhost:8888/demoService",demoService);
}
}
1.5创建 Client 端
1.5.1 创建项目
1.5.2 复制服务端接口
/**
* 定义允许远程调用接口,该接口必须要实现Remote接口
* 允许被远程调用的方法必须要抛出RemoteException
*/
public interface DemoService extends Remote {
String demo(String str) throws RemoteException;
}
1.5.3 创建主方法
public class ClientDemo {
public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
DemoService demoService = (DemoService)Naming.lookup("rmi://localhost:8888/demoService");
String result = demoService.demo("Bjsxt");
System.out.println(result);
}
}
2 使用 Zookeeper 作为注册中心实现 RPC
2.1创建服务端
2.1.1 创建项目
2.1.2 修改 POM 文件添加依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
2.1.3 创建接口
public interface UsersService extends Remote {
String findUsers(String str) throws RemoteException;
}
2.1.4 创建接口实现类
public class UsersServiceImpl extends UnicastRemoteObject implements UsersService {
public UsersServiceImpl() throws RemoteException {
}
@Override
public String findUsers(String str) throws RemoteException {
return "Hello Zookeeper "+str;
}
}
2.1.5 编写主方法
public class ServerDemo implements Watcher {
public static void main(String[] args) throws IOException, AlreadyBoundException, KeeperException, InterruptedException {
UsersService usersService = new UsersServiceImpl();
LocateRegistry.createRegistry(8888);
String url ="rmi://localhost:8888/user";
Naming.bind(url,usersService);
//将url信息放到zookeeper的节点中
ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ServerDemo());
//创建Znode
zooKeeper.create("/bjsxt/service",url.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("服务发布成功");
}
}
2.2创建客户端
2.2.1 创建项目
2.2.2 修改 POM 文件添加依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
2.2.3 创建接口
public interface UsersService {
String findUsers(String str);
}
2.2.4 编写主方法
public class ClientDemo implements Watcher {
public static void main(String[] args) throws IOException, KeeperException, InterruptedException, NotBoundException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ClientDemo());
byte[] bytes = zooKeeper.getData("/bjsxt/service",new ClientDemo(),null);
String url = new String(bytes);
UsersService usersService = (UsersService) Naming.lookup(url);
String result = usersService.findUsers("Bjsxt");
System.out.println(result);
}
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接成功");
}
}
}