前言
Watcher部分的代码量总的来说还是比较多的,但是整个逻辑流程还是相对来说比较清晰的。不过还是需要常在脑子里过一过,zk的watcher的相关的架构的设计还是挺精妙的。
从这一篇起开始说ZK client端-server端交互相关的代码,主要是从client本身,client和server的连接和会话以及server端这三个大点来说。这一篇主要说说大致流程和client端的初始化等。
结构
在网上看到了一张图片非常好的描述了zk工作的大致结构,对理解zk client端以至于整体的代码都挺有帮助的,这里贴出来:
上图主要描述了ZK Client和Server端互动的过程:
- client端把request传递到Zookeeper类中(以Packet形式);
- Zookeeper类处理request并放入outgoingqueue中(sendthread做的);
- sendthread把发出的request移到pendingqueue;
- 收到回复后,sendthread从pendingqueue中取出request,并生成event;
- eventthread处理event并触发watchManager中的watcher,调用callback。
Client端代码结构
其实client端的很多重要的类在之前说watcher,快照和log的时候就已经接触了很多了,这里也是系统地总结下。
其中主要几个类的功能:
Zookeeper:客户端核心类之一,也是入口;
ClientCnxn:客户端连接核心类,包含SendThread和EventThread两个线程。SendThread为I/O线程,主要负责Zookeeper客户端和服务器之间的网络I/O通信;EventThread为事件线程,主要负责对服务端事件进行处理;
ClientWatchManager:客户端watcher管理器;
HostProvider:客户端地址列表管理器。
上图是Zookeeper类及其相关的类的交互UML图,可以通过上图来理解下整个Zookeeper各个功能类之间的关系和协作流程。
主要流程
zk client和server端建立连接从client来说主要分为以下三个阶段:
- 初始化阶段:上面介绍的几个主要功能类的实例化;
- 创建阶段:启动及创建连接;
- 响应请求:响应及接收。
逐个介绍:
初始化阶段
从上图中能看出,第一步就是从Zookeeper类的实例化开始,我们选取一个Zookeeper类的构造器开始分析:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
//设置默认watcher,之前讲watcher的时候说过
watchManager.defaultWatcher = watcher;
//负责解析配置的server地址串
//主要有两个功能:1.加chroot(默认prefix,之前有介绍过);2.读字符串并把多个server地址分开
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//根据之前的字符串解析hostname,ip等,并不一定会按照原来的顺序,在构造器中会将顺序打散
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//实例化clientCnxn对象
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//启动sendThread和eventThread
cnxn.start();
}
其实总结下可以看出整个初始化阶段分为四步:
为默认Watcher赋值
- 解析并设置Zookeeper服务器地址列表
- 实例化ClientCnxn对象
启动clientCnxn对象里的sendThread和eventThread线程。
在这里讲一下构造器中的几个重要类:
ConnectStringParser
public final class ConnectStringParser {
private static final int DEFAULT_PORT = 2181;//默认port
private final String chrootPath;//默认前缀
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();//地址list
ConnectStringParser的构造器很简单,主要就是解析chrootPath和生成上面的serverAddresses地址列表。
StaticHostProvider
有一张图很好的形容了StaticHostProvider的工作原理。
在StaticHostProvider类中调用next方法会在循环队列中不断获取,特别要注意的是这个循环队列本身就已经是打乱过的。
在StaticHostProvider构造器中,把前面ConnectStringParser的server地址会再次解析一遍并生成一个队列(因为上一步解析的结果有的没有address),然后就会如下打乱。
Collections.shuffle(this.serverAddresses);
public InetSocketAddress next(long spinDelay) {
//这个部分主要是循环
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//如果这一次的server地址和上一次一样,那么就睡眠spinDelay时间
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {//如果是第一次访问,就不要等待
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
创建阶段
其实在sendThread和eventThread两个线程启动之后,创建和响应阶段也就开始了。具体的流程会再后面详细说,大致的流程是先从hostprovider获取server地址,然后建立连接并构造请求发送。
详细流程:
- 获取服务器地址(从hostprovider中可以获得),并建立TCP连接;
- 构造ConnectRequest请求。前面的TCP连接建立后,client和server的会话并没有完全建立。SendThread会根据响应的参数构造ConnectRequest,并包装成Packet对象放入outgoingqueue中发送到server端,这就是实际意义上的client和server的一个会话。这部分在之前的watcher发送时有提到。
- ClientCnxnSocket从queue中取出Packet并序列化部分属性发送到server。
这里先把几个基础且比较重要的部分说下:
sendThread
功能:
- 维护client和server的心跳连接,一旦失去连接会立即重连;
- 管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调;
- 接受请求的返回并传递给eventThread去处理。
上面的图大致描述了outgoingqueue(客户端请求等待发送的队列)和pendingQueue(已经发送等待响应处理的队列)的关系。
EventThread
EventThread是客户端ClientCnxn内部的一个事件处理线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThread中的watingEvents队列用于临时存放那些需要被触发的Object,包括客户端注册的Watcher和异步接口中注册的回调器AsyncCallback。同时,EventThread会不断地从watingEvents中取出Object,识别具体类型(Watcher或AsyncCallback),并分别调用process和processResult接口方法来实现对事件的触发和回调。
Packet
其实之前就已经看过Packet的一些处理了,最重要的就是Packet序列化的时候createBB方法里只有部分属性序列化了,包括watcher在内的很多变量都没有序列化,这也是watcher轻量特性的保证。
outgoingqueue和pendingqueue之前提到了主要的作用,而他们内部放置的对象都是Packet。在发送时,sendThread从outgoingqueue取出Packet序列化(带有生成的请求序号XID在请求头中)并发送,然后这个Packet就被转移到pendingqueue中,等待响应处理。
响应阶段
同样的,响应阶段的代码也比较多,后面具体说,这里说下大致流程:
- ClientCnxnSocket接收到响应后,首先判断客户端状态是否初始化,若未初始化,那说明当前客户端与服务端之间正在进行会话创建并反序列化response,生成ConnectResponse(带有sessionid),然后会通知sendThread和HostProvider进行相应的设置;
- 如果为初始化状态,且收到的为事件,那么会反序列化为WatcherEvent,并放到EventThread的等待队列中;
- 如果是常规的请求,如getdata,exists等,那么会从pendingQueue中取出一个Packet来处理。
思考
Outgoingqueue, pendingQueue和EventThread的event等待队列关系:
outgoingqueue就是所有要发送的客户端的请求,pendingqueue就是发送过的等待响应的,如果客户端收到了server端的回复,就会从pendingqueue中取出请求Packet并处理;而event等待队列是为了处理server段主动发起的事件,也就是节点发生了change,server主动发送请求到客户端,client把这类的通知放到event等待队列中。
notification event,非notification event
客户端需要接受服务器发送过来的消息,第一种消息是类似于Watcher回掉这种的,我们叫做notification,他的特点是服务器主动发送消息给客户端的,比如客户端a在数据节点a上设置了getData监听,当客户端b修改了节点a后,服务器主动发送NodeDataChanged消息给客户端a。第二中消息是类似于create,getData这种,他们向服务器发送对应的请求后,然后将请求放进到pendingQueue中,然后等待服务器的响应,当接受到服务器的响应后,再从pendingQueue中取出请求,然后进行回掉。
参考
https://www.cnblogs.com/francisYoung/p/5225703.html 可以多看下理解
http://www.cnblogs.com/leesf456/p/6098255.html
https://www.jianshu.com/p/cbad04b12950
《Paxos到ZK》