if(sockKey.isReadable()){
int rc = sock.read(incomingBuffer);if(rc < 0){
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");}if(!incomingBuffer.hasRemaining()){
incomingBuffer.flip();if(incomingBuffer == lenBuffer){
recvCount++;
readLength();}elseif(!initialized){
// 连接有没有初始化
// 没有初始化
System.out.println("没有初始化");
readConnectResult(); // 读取连接的结果
enableRead();if(findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress())!= null){
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized =true;}else{
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();}}}if(sockKey.isWritable()){
// 写就绪
synchronized(outgoingQueue){
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());if(p != null){
Record request = p.request;
if(request!=null){
System.out.println(request.getClass());}
System.out.println("package------>"+p.toString());
updateLastSend();
// If we already started writing p, p.bb will already exist
if(p.bb == null){
if((p.requestHeader != null)&&(p.requestHeader.getType()!= OpCode.ping)&&(p.requestHeader.getType()!= OpCode.auth)){
p.requestHeader.setXid(cnxn.getXid());}
p.createBB();}
sock.write(p.bb); //byteBuffer 数据 socket<--buffer 将数据发送到服务端
// 发送完成后,从发送队列移除该Packet并将其加入到pendingQueue等待服务器的响应
if(!p.bb.hasRemaining()){
sentCount++;
outgoingQueue.removeFirstOccurrence(p);if(p.requestHeader != null
&& p.requestHeader.getType()!= OpCode.ping
&& p.requestHeader.getType()!= OpCode.auth){
synchronized (pendingQueue){
pendingQueue.add(p);//什么时候拿到结果呢,读结果
}}}}if(outgoingQueue.isEmpty()){
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport()if not.
disableWrite();}elseif(!initialized && p != null &&!p.bb.hasRemaining()){
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, inwhichcase the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();}else{
// Just incase
enableWrite();}}}
1.pendingQueue是等待服务器返回结果的Packet,意思是你把这些packet通过socket 发送到服务端, 但是还没有结果,
2. sockKey.isReadable() //可以从socket channel中读数据 也就是说服务器返回数据了