前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
dataMonitor开始exist操作
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) { ...... //exist请求头 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.exists); //请求体 ExistsRequest request = new ExistsRequest(); request.setPath(serverPath); request.setWatch(watcher != null); SetDataResponse response = new SetDataResponse(); //添加到发送队列 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
添加过程
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. synchronized (outgoingQueue) { //转换成Packet packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } //添加到发送队列 outgoingQueue.add(packet); } } //唤醒下Selector,快点处理 sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:
//重要的业务请求,需要设置事务id if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); }
接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。
PrepRequestProcessor预处理
//All the rest don't need to create a Txn - just verify session //读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break;
SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下
//试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中 zks.getZKDatabase().append(si)接下来FinalRequestProcessor处理,由于不是事务型请求,省了很多步骤,直接进入switch处理:
case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); //反序列化 ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); String path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } //拿对应node的状态,并设置是否watch Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); //结果 rsp = new ExistsResponse(stat); break; } ...... //当前处理zxid long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); //构造相应头 ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, System.currentTimeMillis()); try { //写回响应 cnxn.sendResponse(hdr, rsp, "response");
statNode过程
//此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可 public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException { Stat stat = new Stat(); DataNode n = nodes.get(path); if (watcher != null) { //对于exists请求,需要监听data变化事件,添加watcher dataWatches.addWatch(path, watcher); } if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); return stat; } }
好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理
if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); ...... else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } //此时client session已经建立,init完成 else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } }
具体读取:
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); //先读响应头,先对特殊的xid进行处理 replyHdr.deserialize(bbia, "header"); ...... //从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序 Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { //检查一下是否一样 if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); //更新client端的最新zxid if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } //反序列化响应 if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { //处理packet,主要是注册watcher,回调,触发事件 finishPacket(packet); } }
finishPacket过程
private void finishPacket(Packet p) { //注册watcher,如果是exists请求,则注册到dataWatches中 if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } //如果是同步接口,则唤醒等待的业务线程 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } //如果是异步请求,则发送异步事件 else { p.finished = true; eventThread.queuePacket(p); } }
EventThread端
else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; //如果成功,增加node stat的回调参数 if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } //如果响应失败,stat为null else { cb.processResult(rc, clientPath, p.ctx, null); } }
在DataMonitor例子中,它本身就是一个StatCallback
public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; }
Exists过程大致就是上面描述的,主要注意点:
1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
2.同步接口是使用Packet.wait()实现的
3.server端exists操作不是事务型的操作,不会写入log
4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可