Client的主要包含下面几个内部类:
主要的几个类说明:
- 1. Call,表示一次rpc的调用请求
- 2. Connection,表示一个client与server之间的连接,一个连接一个线程启动
- 3. ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)
Connection类:
主要属性说明:
- private InetSocketAddress server; //IPC服务器地址
- private final ConnectionId remoteId;//连接标识
- private Socket socket = null; //TCP连接的Socket对象
- private DataInputStream in;
- private DataOutputStream out;
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//当前正在处理的远程调用
- private AtomicLong lastActivity = new AtomicLong();//IPC连接的最后一次通信时间
- private AtomicBoolean shouldCloseConnection = new AtomicBoolean();//连接关闭标识
- private IOException closeException;//导致IPC连接关闭的异常
主要方法介绍:
touch方法,更新最后一次通信时间
addCall方法:将远程调用放入calls集合中,并且唤醒waitForWork方法
setupConnection方法:创建Socket连接,创建过程中失败会进行重连直到达到次数限制
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; //死循环,除非创建socke连接成功,或者重连次数达到限制抛出异常 while (true) { try { //创建一个TCP Socket对象 this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(true); /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ //通过HOST名称连接 UserGroupInformation ticket = remoteId.getTicket(); if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } //连接到服务器 NetUtils.connect(this.socket, server, connectionTimeout); if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } //设置超时时间 this.socket.setSoTimeout(pingInterval); return; } catch (ConnectTimeoutException toe) { /* Check for an address change and update the local reference. * Reset the failure counter if the address was changed */ //检查地址是否变了,如果变了则更新,且重置失败计数器 if (updateAddress()) { timeoutFailures = ioFailures = 0; } //关闭当前连接,并计算失败次数是否超过超时重试次数,如果是抛出异常 handleConnectionTimeout(timeoutFailures++, maxRetriesOnSocketTimeouts, toe); } catch (IOException ie) { //检查地址是否变了,如果变了则更新,且重置失败计数器 if (updateAddress()) { timeoutFailures = ioFailures = 0; } //关闭当前连接,并计算失败次数是否超过失败重试次数,如果是抛出异常 handleConnectionFailure(ioFailures++, ie); } } }
writeConnectionHeader方法:往服务端发送头信息,服务端根据头信息协议版本检查,接口检查,权限检查
/** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(RpcConstants.HEADER.array());//IPCd的魔数hrpc out.write(RpcConstants.CURRENT_VERSION);//版本 out.write(serviceClass);//请求的类 out.write(authProtocol.callId);//权限协议 out.flush(); }
waitForWork方法:这是一个阻塞方法
private synchronized boolean waitForWork() { //calls为空&&连接没标识关闭&&client运行中 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (Time.now()-lastActivity.get()); if (timeout>0) { try { wait(timeout);//阻塞timeout时间,或者被唤醒(addCall方法中会调用) } catch (InterruptedException e) {} } } //calls不为空&&连接没标识关闭&&client运行中 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; } else if (shouldCloseConnection.get()) { return false; } else if (calls.isEmpty()) { //连接空闲,就关闭 markClosed(null); return false; } else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause( new InterruptedException())); return false; } }
run方法:接受服务端返回,从此方法可以看出IPC连接可以复用,run方法会一直接收返回直到关闭
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //阻塞,直到连接 while (waitForWork()) {//wait here for work - read or close connection receiveRpcResponse();//接受服务端返回数据 } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
receiveRpcResponse方法:接收服务器返回数据
private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch();//更新最后通信时间 try { //数据长度 int totalLen = in.readInt(); //返回头信息 RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header);//校验头信息 int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value calls.remove(callId);//从calls中删除本次Call call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily //校验长度正确性,当返回数据是ProtobufRpcEngine.RpcWrapper类型 if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); //总长度!=头报文长度+数据报文长度就是有问题 if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc Request failed // Verify that length was correct //请求失败,总长度应该等于报文头长度 if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } //异常类名 final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; //错误信息 final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; //错误码 final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { calls.remove(callId);//从calls中删除本次Call call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection //请求失败关闭连接connection markClosed(re); } } } catch (IOException e) { markClosed(e); } }