此类为outgoingQueue请求发送队列,提供服务并生成心跳, 它还产生了ReadThread。
class SendThread extends Thread {
//上次发送ping的时间纳秒数
private long lastPingSentNs;
//底层Socket通信的接口,默认实现是ClientCnxnSocketNIO
private final ClientCnxnSocket clientCnxnSocket;
//使用当前时间为种子生成[0,System.nanoTime())内随机整数序列:
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
//读取服务器端的响应
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
//如果接收到的是ping响应,则直接return返回
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
return;
}
//如果接收到的是auth响应,进一步处理后,return返回
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
return;
}
//如果接收到的是一个事件,则将该时间放入到事件队列waitingEvent中(之后EventThread会去处理),return返回
if (replyHdr.getXid() == -1) {
// -1 means notification
//反序列化WatcherEvent
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
......省略部分代码......
//将WatcherEvent还原成WatchedEvent
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//queueEvent方法,先根据事件找到跟事件绑定的Watcher集合,
//然后封装成WatcherSetEventPair对象放入到waitingEvent队列中,等待之后的处理
eventThread.queueEvent( we );
return;
}
......省略部分代码......
//如果接收到的是一个正常的请求响应(指的是Create、GetData、Exist等操作请求),
//则从pendingQueue队列中取出头部的Packet元素进行相应的处理
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
//校验服务器端响应中包含的XID值来确保请求处理的顺序性
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
//如果服务器端的响应没有错误,则对响应体response进行反序列化成相应的Response对象
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
} finally {
//finishPacket方法会从packet中解析出注册的WatchRegistration对象并
//得到Watcher对象转交给ZKWatchManager,会根据类型保存到dataWatches或者existWatches或者childWatches中
finishPacket(packet);
}
}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
......省略部分代码......
//与服务器连接成功后就会调用该方法
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
//构造ConnectRequest请求,该请求表示客户端试图与服务器创建一个会话
//将该请求封装成Packet对象,放入到outgoingQueue队列的队头
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
SetWatches sw = new SetWatches(lastZxid,
prependChroot(dataWatches),
prependChroot(existWatches),
prependChroot(childWatches));
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
......省略部分代码......
//向服务器发送ping命令方法
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
private InetSocketAddress rwServerAddress = null;
private final static int minPingRwTimeout = 100;
private final static int maxPingRwTimeout = 60000;
private int pingRwTimeout = minPingRwTimeout;
// Set to true if and only if constructor of ZooKeeperSaslClient
// throws a LoginException: see startConnect() below.
private boolean saslLoginFailed = false;
//开始与服务器进行连接
private void startConnect() throws IOException {
state = States.CONNECTING;
//从HostProvider中随机取出一个地址
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
......省略部分代码......
//委托给ClientCnxnSocket去创建与该ZooKeeper服务器之间的TCP连接
clientCnxnSocket.connect(addr);
}
......省略部分代码......
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
//state表示ZooKeeper客户端状态,CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
//CLOSED, AUTH_FAILED, NOT_CONNECTED
//只要不是CLOSED和AUTH_FAILED,都表示客户端值alive的,就一直进行循环发送数据
while (state.isAlive()) {
try {
//如果clientCnxnSocket没有连接上服务器,则进行连接
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
//如果客户端连接上服务器
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(
"Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv() + "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//如果客户端连接上了,则判断是否需要发送心跳检测包并更新上次发送信息
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend();
if (timeToNextPing <= 0) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//如果处于只读模式,则寻找可读写服务器,pingRwServer()
if (state == States.CONNECTEDREADONLY) {
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
......省略部分代码......
//通过clientCnxnSocket.doTransport从outgoingQueue队列中取出队列头部的Packet进行发送
//并根据情况将该Packet放入到pendingQueue队列中
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
}
//寻找可读写ReadWrite的服务器
private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);
LOG.info("Checking server " + addr + " for being r/w." +
" Timeout " + pingRwTimeout);
try {
Socket sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
BufferedReader br = new BufferedReader(
new InputStreamReader(sock.getInputStream()));
result = br.readLine();
sock.close();
br.close();
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server " +
e.getMessage(), e);
}
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;
throw new RWServerFoundException("Majority server found at "
+ addr.getHostName() + ":" + addr.getPort());
}
}
private void cleanup() {
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
}
pendingQueue.clear();
}
synchronized (outgoingQueue) {
for (Packet p : outgoingQueue) {
conLossPacket(p);
}
outgoingQueue.clear();
}
}
//当ClientCnxnSocket与服务器建立了连接后,就会调用该回调方法
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
throw new SessionExpiredException(
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
......省略部分代码......
public void sendPacket(Packet p) throws IOException {
clientCnxnSocket.sendPacket(p);
}
}