Hadoop-common之Client(1)

Client的主要包含下面几个内部类:

       主要的几个类说明: 

  • 1. Call,表示一次rpc的调用请求
  • 2. Connection,表示一个client与server之间的连接,一个连接一个线程启动
  • 3. ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)

Connection类:

         主要属性说明:

  • private InetSocketAddress server; //IPC服务器地址
  • private final ConnectionId remoteId;//连接标识
  • private Socket socket = null; //TCP连接的Socket对象
  • private DataInputStream in;
  • private DataOutputStream out;
  • private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//当前正在处理的远程调用
  • private AtomicLong lastActivity = new AtomicLong();//IPC连接的最后一次通信时间
  • private AtomicBoolean shouldCloseConnection = new AtomicBoolean();//连接关闭标识
  • private IOException closeException;//导致IPC连接关闭的异常

        主要方法介绍:

            touch方法,更新最后一次通信时间

            addCall方法:将远程调用放入calls集合中,并且唤醒waitForWork方法

               setupConnection方法:创建Socket连接,创建过程中失败会进行重连直到达到次数限制     

private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      //死循环,除非创建socke连接成功,或者重连次数达到限制抛出异常
      while (true) {
        try {
          //创建一个TCP Socket对象
          this.socket = socketFactory.createSocket();
          this.socket.setTcpNoDelay(tcpNoDelay);
          this.socket.setKeepAlive(true);
          
          /*
           * Bind the socket to the host specified in the principal name of the
           * client, to ensure Server matching address of the client connection
           * to host name in principal passed.
           */
          //通过HOST名称连接
          UserGroupInformation ticket = remoteId.getTicket();
          if (ticket != null && ticket.hasKerberosCredentials()) {
            KerberosInfo krbInfo = 
              remoteId.getProtocol().getAnnotation(KerberosInfo.class);
            if (krbInfo != null && krbInfo.clientPrincipal() != null) {
              String host = 
                SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());
              
              // If host name is a valid local address then bind socket to it
              InetAddress localAddr = NetUtils.getLocalInetAddress(host);
              if (localAddr != null) {
                this.socket.bind(new InetSocketAddress(localAddr, 0));
              }
            }
          }
          //连接到服务器
          NetUtils.connect(this.socket, server, connectionTimeout);
          if (rpcTimeout > 0) {
            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
          }
          //设置超时时间
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (ConnectTimeoutException toe) {
          /* Check for an address change and update the local reference.
           * Reset the failure counter if the address was changed
           */
          //检查地址是否变了,如果变了则更新,且重置失败计数器
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          //关闭当前连接,并计算失败次数是否超过超时重试次数,如果是抛出异常
          handleConnectionTimeout(timeoutFailures++,
              maxRetriesOnSocketTimeouts, toe);
        } catch (IOException ie) {
          //检查地址是否变了,如果变了则更新,且重置失败计数器
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
        //关闭当前连接,并计算失败次数是否超过失败重试次数,如果是抛出异常
          handleConnectionFailure(ioFailures++, ie);
        }
      }
    }

            writeConnectionHeader方法:往服务端发送头信息,服务端根据头信息协议版本检查,接口检查,权限检查           

/**
     * Write the connection header - this is sent when connection is established
     * +----------------------------------+
     * |  "hrpc" 4 bytes                  |      
     * +----------------------------------+
     * |  Version (1 byte)                |
     * +----------------------------------+
     * |  Service Class (1 byte)          |
     * +----------------------------------+
     * |  AuthProtocol (1 byte)           |      
     * +----------------------------------+
     */
    private void writeConnectionHeader(OutputStream outStream)
        throws IOException {
      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
      // Write out the header, version and authentication method
      out.write(RpcConstants.HEADER.array());//IPCd的魔数hrpc
      out.write(RpcConstants.CURRENT_VERSION);//版本
      out.write(serviceClass);//请求的类
      out.write(authProtocol.callId);//权限协议
      out.flush();
    }

             waitForWork方法:这是一个阻塞方法   

private synchronized boolean waitForWork() {
      //calls为空&&连接没标识关闭&&client运行中
      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
        long timeout = maxIdleTime-
              (Time.now()-lastActivity.get());
        if (timeout>0) {
          try {
            wait(timeout);//阻塞timeout时间,或者被唤醒(addCall方法中会调用)
          } catch (InterruptedException e) {}
        }
      }
    //calls不为空&&连接没标识关闭&&client运行中
      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
        return true;
      } else if (shouldCloseConnection.get()) {
        return false;
      } else if (calls.isEmpty()) { //连接空闲,就关闭
        markClosed(null);
        return false;
      } else { // get stopped but there are still pending requests 
        markClosed((IOException)new IOException().initCause(
            new InterruptedException()));
        return false;
      }
    }

            run方法:接受服务端返回,从此方法可以看出IPC连接可以复用,run方法会一直接收返回直到关闭  

public void run() {
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": starting, having connections " 
            + connections.size());

      try {
    	//阻塞,直到连接
        while (waitForWork()) {//wait here for work - read or close connection
          receiveRpcResponse();//接受服务端返回数据
        }
      } catch (Throwable t) {
        // This truly is unexpected, since we catch IOException in receiveResponse
        // -- this is only to be really sure that we don't leave a client hanging
        // forever.
        LOG.warn("Unexpected error reading responses on connection " + this, t);
        markClosed(new IOException("Error reading responses", t));
      }
      
      close();
      
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": stopped, remaining connections "
            + connections.size());
    }

           receiveRpcResponse方法:接收服务器返回数据

  

 private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();//更新最后通信时间
      
      try {
    	//数据长度
        int totalLen = in.readInt();
        //返回头信息
        RpcResponseHeaderProto header = 
            RpcResponseHeaderProto.parseDelimitedFrom(in);
        checkResponse(header);//校验头信息

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);

        Call call = calls.get(callId);
        RpcStatusProto status = header.getStatus();
        if (status == RpcStatusProto.SUCCESS) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          calls.remove(callId);//从calls中删除本次Call
          call.setRpcResponse(value);
          
          // verify that length was correct
          // only for ProtobufEngine where len can be verified easily
          //校验长度正确性,当返回数据是ProtobufRpcEngine.RpcWrapper类型
          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
            ProtobufRpcEngine.RpcWrapper resWrapper = 
                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
            //总长度!=头报文长度+数据报文长度就是有问题
            if (totalLen != headerLen + resWrapper.getLength()) { 
              throw new RpcClientException(
                  "RPC response length mismatch on rpc success");
            }
          }
        } else { // Rpc Request failed
          // Verify that length was correct
         //请求失败,总长度应该等于报文头长度
          if (totalLen != headerLen) {
            throw new RpcClientException(
                "RPC response length mismatch on rpc error");
          }
          //异常类名
          final String exceptionClassName = header.hasExceptionClassName() ?
                header.getExceptionClassName() : 
                  "ServerDidNotSetExceptionClassName";
          //错误信息
          final String errorMsg = header.hasErrorMsg() ? 
                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
          //错误码
          final RpcErrorCodeProto erCode = 
                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
          if (erCode == null) {
             LOG.warn("Detailed error code not set by server on rpc error");
          }
          RemoteException re = 
              ( (erCode == null) ? 
                  new RemoteException(exceptionClassName, errorMsg) :
              new RemoteException(exceptionClassName, errorMsg, erCode));
          if (status == RpcStatusProto.ERROR) {
            calls.remove(callId);//从calls中删除本次Call
            call.setException(re);
          } else if (status == RpcStatusProto.FATAL) {
            // Close the connection
        	//请求失败关闭连接connection
            markClosed(re);
          }
        }
      } catch (IOException e) {
        markClosed(e);
      }
    }

猜你喜欢

转载自h140465.iteye.com/blog/2270104