1. rpc请求中怎么分发请求方法
方法一: 直接使用反射, 通过方法名, 参数名等反射调用
实际使用中的示例, hadoop的实现, 具体可参见 http://jimmee.iteye.com/blog/1206598 例如:
org.apache.hadoop.ipc.RPC
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); long startTime = System.currentTimeMillis(); Object value = method.invoke(instance, call.getParameters()); ....... }
方式二: 使用一个标记值来区分, 例如, 如果readInt()=1, 则表示method1, 若readInt()=2, 则表示method2
现实中的示例, 同样是hadoop, 例如:
org.apache.hadoop.hdfs.server.datanode.DataXceiver
* Read/write data from/to the DataXceiveServer. */ public void run() { DataInputStream in=null; try { in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); short version = in.readShort(); if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) { throw new IOException( "Version Mismatch" ); } boolean local = s.getInetAddress().equals(s.getLocalAddress()); byte op = in.readByte(); // Make sure the xciver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); if (curXceiverCount > dataXceiverServer.maxXceiverCount) { throw new IOException("xceiverCount " + curXceiverCount + " exceeds the limit of concurrent xcievers " + dataXceiverServer.maxXceiverCount); } long startTime = DataNode.now(); switch ( op ) { case DataTransferProtocol.OP_READ_BLOCK: readBlock( in ); datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime); if (local) datanode.myMetrics.readsFromLocalClient.inc(); else datanode.myMetrics.readsFromRemoteClient.inc(); break; case DataTransferProtocol.OP_WRITE_BLOCK: ..... }
方式三: thrift的实现方式, 就是一个接口方法对应一个类, 接口的所有参数对应一个类.
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName()); public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { // 一个接口对应一个类 processMap.put("query", new query()); ..... } // 接口的参数 public static class query_args implements org.apache.thrift.TBase<query_args, query_args._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("query_args");
执行过程:
(1) 线程池处理连接(题外话: 这里的实现允许连接进来, 实际上会排队, 线程池处理链接, 如果都是长连接, 则后面的会等很久很久)
setServing(true); while (!stopped_) { int failureCount = 0; try { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); executorService_.execute(wp); } catch (TTransportException ttx) { if (!stopped_) { ++failureCount; LOGGER.warn("Transport error occurred during acceptance of message.", ttx); } } }
(2) 读取消息:
public boolean process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); // 方法名称 ProcessFunction fn = processMap.get(msg.name); if (fn == null) { TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); return true; } // 方法处理 fn.process(msg.seqid, in, out, iface); return true; } public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException { T args = getEmptyArgsInstance(); try { // 读取参数值 args.read(iprot); } catch (TProtocolException e) { iprot.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid)); x.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); return; } iprot.readMessageEnd(); // 实际方法执行 TBase result = getResult(iface, args); oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid)); result.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); } protected query_result getResult(I iface, query_args args) throws org.apache.thrift.TException { query_result result = new query_result(); try { result.success = iface.query(args.domainName, args.query, args.mode); } catch (OperationException excp) { result.excp = excp; } return result; }