前言
上一篇讲了client端和server创建连接的初始化和创建过程,这两个部分主要是和sendthread紧紧相关的,这一篇讲一下响应阶段,响应阶段和sendthread,eventthread都有一定的关系。
获取响应
其实获取响应对于sendthread来说就是readresponse方法,在上一篇已经详细讲过了,主要的流程就是:
- 反序列化response;
- 根据回复头来处理,如果是ping,auth和sasl直接处理后返回,不会加入waitingevent队列;
- 如果是server的通知表示是event,加入队列
- 处理pendingqueue里已经发送的packet。
但是这里关于ping的处理有点需要再说下。
if (replyHdr.getXid() == -2) {//ping的response,只要能收到就表示能ping通
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");//打log就ok
}
return;
}
if (replyHdr.getXid() == -4) {//auth的返回头
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//是否是验证失败,如果失败了就要加入到等待队列里让eventthread处理
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId)); //打log
}
return;
}
如果之前说的,ping和auth的request不会加入pendingqueue里,收到回复后直接处理了。这里有个地方强调下,client端检测心跳的机制其实readresponse这里没有用,这里只是收个回复打个log。真正控制是不是断开连接的地方就在上一篇发请求的那里,通过和上一次ping的时间来判断是否过期。那怎么去知道上一次ping通是啥时候呢,在client负责和server连接的doIO方法里有个updateLastHeard()方法,收到server的消息后便会执行这个方法。
void updateLastHeard() {
this.lastHeard = now;
}
可以看到这个方法就是更新上次收到的时间的。
to = readTimeout - clientCnxnSocket.getIdleRecv();
to就是预设的readtimeout和当前请求和上一次请求的间隔时间的差值。如果处于连接状态,则利用和上次ping时间的间隔的比较来判断是否超时,如果超时就会抛出异常。其实新的问题就来了,server怎么知道自己和server处于连接还是断开连接的状态呢,这个后面讲session再专门说一下。
加入等待队列的event
经过前面的分析和总结可以知道sendthread把Packet发送到server后会把部分Packet加入到pendingqueue中等待,而接收到server的回复后会把event加入到等待队列中处理,eventthread的主要功能就是处理这些event。首先总结下哪些event会被加入到等待队列中。
- auth验证失败的event。
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { //ClientCnxn 756
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
- watcher被触发的event
eventThread.queueEvent( we );//ClientCnxn 794
- sasl验证失败的event
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1012
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
- sasl验证的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1094
Watcher.Event.EventType.None,
authState,null));
- 断开连接的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1175 1188
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
- session过期的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1280
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
- 只读连接的event
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent( //ClientCnxn 1309
Watcher.Event.EventType.None,
eventState, null));
这7中event某种程度上是属于server通知的消息,所以必须要eventthread去处理,但是实际上sendthread很多发出去的Packet(如create,getdata等等)也会被加入到等待队列中,但是这个是有限制的,只有在异步模式下才会被加入到等待队列中。
EventThread
EventThread是客户端ClientCnxn内部的一个事件处理线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThread中的watingEvents队列用于临时存放那些需要被触发的Object,包括客户端注册的Watcher和异步接口中注册的回调器AsyncCallback。同时,EventThread会不断地从watingEvents中取出Object,识别具体类型(Watcher或AsyncCallback),并分别调用process和processResult接口方法来实现对事件的触发和回调。
代码结构:
域:
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
这里最重要的就是waitingEvents,这个FIFO队列就是之前说的等待队列。下面说下队列处理的各种类型的数据。
server的notification加入队列
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) { //根据事件类型和状态来判断,如果事件类型为None且session状态没有变化就不加入队列中
return;
}
sessionState = event.getState(); //获取session状态
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(//构建路径和事件(连接状态和event状态)的关系,之前介绍过
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);//根据事件类型做对应的处理
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);//加入队列,等待处理
}
异步请求的Packet加入队列
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {//同步模式
synchronized (p) {
p.finished = true;
p.notifyAll();//如果调用的是同步的接口,在submitRequest时会wait住,而且同步的接口没有回调方法,所以不会加入队列中。
//submitRequest里wait住的部分
//synchronized (packet) {
// while (!packet.finished) {
// packet.wait();
// }
//}
}
} else {//异步
p.finished = true;
eventThread.queuePacket(p);//异步接口时把packet加入等待队列
}
}
上面的代码解释了为什么调用异步接口才会把packet加入队列。
public void queuePacket(Packet packet) {
if (wasKilled) {//eventThread是否被kill
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);//正在跑就加入队列
else processEvent(packet);//如果线程没跑了就直接处理掉
}
} else {
waitingEvents.add(packet);//加入对等队列
}
}
这里有两个变量wasKilled和isRunning解释下,它们的操作是在eventthread的run方法中被处理的。
@Override
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();//取出队列第一个元素
if (event == eventOfDeath) {//eventOfDeath表示eventthread需要被kill
wasKilled = true;//设置标志,但是这里并没有被真正kill,表示要被kill
} else {
processEvent(event);//不是death标志就处理
}
if (wasKilled)
synchronized (waitingEvents) {//如果要被kill了,直到队列被处理完了才会把isRunning状态设置为false
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
对于event的处理都在processevent方法中,这个方法主要处理了watcher被触发后的执行和各个异步接口的回调函数这两部分的内容。
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {//watcher类型
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);//执行watcher的回调
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {//异步接口的回调
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
...
} else if (p.response instanceof GetDataResponse) {
...
} else if (p.response instanceof GetACLResponse) {
...
} else if (p.response instanceof GetChildrenResponse) {
...
} else if (p.response instanceof GetChildren2Response) {
...
} else if (p.response instanceof CreateResponse) {
...
} else if (p.response instanceof MultiResponse) {
...
} else if (p.cb instanceof VoidCallback) {
...
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
根据函数名可以清楚地知道各个异步接口的回调都在这里执行了。
eventThread的death加入队列
public void queueEventOfDeath() {
waitingEvents.add(eventOfDeath);
}
eventthread要被kill只有两种情况:
- client和server建立连接没有连接上或者连接断开。
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;//连接的timeout
if (negotiatedSessionTimeout <= 0) {//没有连接上server
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();//kill eventthread
- 客户端和server断开连接时
/**
* Shutdown the send/event threads. This method should not be called
* directly - rather it should be called as part of close operation. This
* method is primarily here to allow the tests to verify disconnection
* behavior.
*/
public void disconnect() {
if (LOG.isDebugEnabled()) {
LOG.debug("Disconnecting client for session: 0x"
+ Long.toHexString(getSessionId()));//log
}
sendThread.close();//sendthread关闭
eventThread.queueEventOfDeath();//eventthread关闭
}
思考
zk的session机制
参考
《从Paxos到Zookeeper》
http://www.cnblogs.com/leesf456/p/6098255.html
https://www.jianshu.com/p/4a1902a44439