- 概述
- Hadoop RPC
- Hadoop Client
- Hadoop Server
- Hadoop RPC的使用
- Yarn RPC
- 参考
概述
RPC(Remote Procedure Call)远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。[引用自百度百科].
RPC内部的结构一般如下图所示:
- RPC Client: RPC协议的调用方。
- RPC Server: 远程方法的提供方。
- RPC Proxy/Stub: 存在于客户端,因为RPC协议的”透明性”,需要存在一个Stub层封装RPC远程调用的过程实现,让客户端觉得是在本地调用方法一样。
- RPC Processor/Selector: 存在于服务端,由于服务器端某一个RPC接口的实现的特性(它并不知道自己是一个将要被RPC提供给第三方系统调用的服务),所以在RPC框架中应该有一种“负责执行RPC接口实现”的角色。它负责了包括:管理RPC接口的注册、判断客户端的请求权限、控制接口实现类的执行在内的各种工作。
- MessageProtocol: 由于一次交互都有服务端和客户端两端都能识别的,共同约定的格式。消息管理层负责对消息的编码和解码。同时要保证消息序列化的高效性。
- Transfer/Network: 负责管理RPC框架所使用的网络协议、网络IO模型。
- IDL: 接口定义语言,为跨语言的特性设计的通用的消息格式。
Hadoop RPC
Haddoop中的RPC有两种,一种是hadoop-common下的ipc.RPC类,还有一种是hadoop-yarn-common下的ipc.YarnRPC类。
RPC类中是对底层客户机-服务器网络模型的封装,以便为程序员提供一套简洁的接口,是Hadoop的底层核心组件。在Hadoop HDFS,MapReduce和HBase中有着广泛的使用。YarnRPC类是Yarn中使用的RPC类,其封装了hadoop-common下的RPC,并默认使用了protobuf作为序列化工具,在Yarn的协议中使用。
下面分析hadoop-common下的RPC.java类。首先展示这个类的Outline:
从outline中看到
- RpcKind: 内部枚举,展示了RPC框架将使用哪种Rpc引擎,其中包含了WritableRpcEngine和ProtobufRpcEngine,分别对应了不同序列化方式的RPC实现。
- RpcInvoker: 内部接口,官方注释为: Process a client call on the server side,即表示这是一个在服务端处理客户端请求的接口。
- getSuperInterfaces(Class
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {//安全
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}
可以看刀这个方法是获取一个远程服务的代理,中间包含了连接和server的方法的代理。其中通过getProtocolEngine获取一个特定序列化协议的RpcEngine。
// return the RpcEngine configured to handle a protocol
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
//从RpcEngine缓存中获取,如果不存在的话则读取配置文件通过反射机制创建一个rpcEngine
//默认是WritableRpcEngine
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
由于现在hadoop基本上都是用protobuf来序列化,下面从ProtobufRpcEngine中来分析getProxy这个方法。
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
//InvocationHandler
private static class Invoker implements RpcInvocationHandler {
//中间代码省略...
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
private Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
}
//中间代码省略...
//invoke()
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
//中间代码省略...
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
//中间代码省略...
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {...}
}
}
总体上来说,getProxy中使用的是java中的动态代理。首先创建一个叫invoker的invocationHandler,里面包含了本次连接的id,客户端client等,以及重写了invoke()方法,在invoke()中将调用的method方法封装在rpcRequestHeader中,同时通过client.call()发送到服务端。创建好了invocationHandler后,再通过Proxy.newProxyInstance()创建代理类实例,根据生成的代理类实例,即可调用对应的方法。
- Builder
这个是RPC Server的一个构造者对象,可以通过RPC.Builder.build()方法构建一个服务器对象。类似代码如下:
Server server = new RPC.Builder(config).setProtocol(protocol).
setInstance(instance).setBindAddress(address).setPort(port).
setNumHandlers(default).setnumReaders(defaultReaders).
setQueueSizePerHandler(1).setVerbose(true).build() ;
server.start();
- Server
这个Server是RPC类对Hadoop Server的一个封装,通过Builder中的build()方法调用生成org.apache.hadoop.ipc.Server对象。
Hadoop Client
Client类的主要功能就是sendRequest和receiveResponse。首先来看看这个类的outline:
从outline中可以看到Client中主要有这么几个内部类:
- ClientExecutorServiceFactory 这个类主要是客户端为了发送rpc请求创建线程池的单例类,当创建客户端时,会创建这样一个线程池单例.
private final static ClientExecutorServiceFactory clientExcecutorFactory =
new ClientExecutorServiceFactory();
private static class ClientExecutorServiceFactory {
private int executorRefCount = 0;
private ExecutorService clientExecutor = null;
/**
* Get Executor on which IPC calls' parameters are sent.
* If the internal reference counter is zero, this method
* creates the instance of Executor. If not, this method
* just returns the reference of clientExecutor.
*
* @return An ExecutorService instance
*/
synchronized ExecutorService refAndGetInstance() {
if (executorRefCount == 0) {
clientExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("IPC Parameter Sending Thread #%d")
.build());
}
executorRefCount++;
return clientExecutor;
}
...省略部分代码...
}
- Call 这个类封装了一个RPC请求,其中包含了唯一的id,重复次数retry,发送请求rpcRequest,收到的结果rpcResponse,以及发送的状态error,done等。由于hadoop发送请求是异步的,所以需要id来确定不同的调用。
Connection 这个类封装了Client和Server之间连接的基本信息以及一些基本操作,如sendRpcRequest,receiveRpcRequest等。
- Connection类中维护了一个类型为Hashtable
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
return call.getRpcResponse();
}
}
}
Hadoop Server
Server即服务端。Hadoop Server为了保证高性能采用了很多提高并发处理能力的技术,其中有线程池、事件驱动以及使用了Reactor模式。
这里不对Reactor进行介绍了。只是贴一张Reactor模式的图:
ipc.Server的整体架构和上面的一致。由于篇幅关系,不再贴出Server的outline图。直接分析其中的实现吧。
Call 和客户端类似,将rpcRequest和rpcResponse以及连接信息封装起来,由Reader读取来自客户端的连接请求解析后组装而成,应该该是由于Reactor模式中将一次连接的操作分割为连接,读取,处理和写入等单元操作后,为了控制是同一个连接的操作而建立的类。
Listener 相当于Acceptor角色,整个Server只有一个Listener线程,负责用于监听来自客户端的请求。看看源码:
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null; //Reader
...省略中间代码...
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
...
}
首先Listener类初始化时,会建立起socket连接,绑定相关的地址后,创建内部的Reader数组,同时打开Selector,在通道上建立对SelectionKey.OP_ACCEPT的监听。当Server创建Listener完毕并调用start方法时,Listener线程的run方法开始执行,其中主要是一个doAccept()方法
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}
这个方法主要是采用Round Robin轮询调度的方式获取一个Reader,并建立起一个Connection对象。Connection是指一个连接对象,Server将rpc连接的信息和操作封装成Connection。 Connection中的操作有处理读取的请求数据readAndProcess(),以及封装返回的应答数据等。通过ConnectionManager来管理这些Connection。同时Connection建立时会生成一个responsequeue对象,用于处理完请求后对应答的缓冲。
- Reader Reader是Listener中的一个内部类,当穿件Listener时,会创建一个Reader的数组,这些Reader分别负责接收来自客户端连接的Rpc请求。Reader线程中主要是执行doRunLoop()方法,首先会建立一个pendingConnections的Connection队列作为缓冲,防止当单个connection占用过多时间时对readingSelector产生的饥饿现象。然后建立对SelectionKey.OP_READ事件的监听,同时调用doRead()->readAndProcess()->processOneRpc()->processRpcRequest()方法,处理完connectionhead和connectionContext后,将客户端传来的rpcRequest信息封装成一个Call对象,然后将Call对象放置到callqueue中。callqueue作为Reader和Handler之间的缓存队列,防止当Reader产生过多Call时Handler线程处理不过来的情形。
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
...省略...
Writable rpcRequest;
try { //Read the rpc request 读取Rpc请求
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
String err = "IPC server unable to read call parameters: "+ t.getMessage();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
...省略...
//封装为一个Call对象
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
- Handler 这个是处理请求的线程类,Server可以同时存在多个Handler线程,它们并行的从共享队列callqueue中读取Call对象,然后执行对应的调用函数之后,即将应答结果通过reponse.doRespond()返回给客户端。 下面是Handler线程运行时的部分代码:
@Override
public void run() {
...
while (running) {
TraceScope traceScope = null;
try {
final Call call = callQueue.take(); // 从callqueue中取用于处理的Call对象
...
try {//执行对应的调用函数,涉及到了用户权限
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
...
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
//封装好应答信息
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);
...
//向responsequeue中增加数据,同时查看responsequeue长度是否为1,如果为1的话则直接向客户端发送应答。
responder.doRespond(call);
}
} catch (InterruptedException e) {
...
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}
- Responder 负责将应答返回给客户端。Responder创建时会打开一个writeSeletor用于监听channel中的SelectionKey.OP_WRITE事件。当Responder线程运行时,会相应的执行doRunLoop()->doAsyncWrite()->processResponse()方法来执行写的操作。
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment();
if (call == null) {
return;
}
if (key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}
synchronized(call.connection.responseQueue) {
if (processResponse(call.connection.responseQueue, false)) {
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
LOG.warn("Exception while changing ops : " + e);
}
}
}
}
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//如果responsequeue已经处理完
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
//尽量向通道写入数据
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.rpcResponse.hasRemaining()) {
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
//如果由于特殊原因(数据量过大或者网络波动),那么重新将Call放入responsequeue中,由Responder处理
// If we were unable to write the entire response out, then
// insert in Selector queue.
//
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = Time.now();
incPending();
try {
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
done = true;
} finally {
decPending();
}
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}
从源码可以看到,当Handler没能将结果一次性返回给客户端时,会想writeSelector注册SelectionKey.OP_WRITE事件,进而有Responder采用异步方式处理发送这个结果。这样的好处是在处理一些大的请求任务时也兼容处理一些小的任务。下面展示了Server各个组件的处理流程:
Hadoop RPC的使用
这里介绍一下Hadoop RPC的使用。
- 首先定义一个RPC协议,这个自定义的协议必须继承VersionedProtocol。
/**
* 自定义的protocol协议
*/
public interface MyProtocol extends VersionedProtocol{
public static final long versionID = 1L ;
public String echo() throws IOException;
}
- 实现自定义的协议 。
public class MyProtocolImpl implements MyProtocol {
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return MyProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
int clientMethodsHash)
throws IOException {
return new ProtocolSignature(MyProtocol.versionID, null);
}
@Override
public String echo() throws IOException {
Calendar cal = Calendar.getInstance() ;
Date date = cal.getTime() ;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss") ;
return sdf.format(date) ;
}
}
- 使用RPC中的Builder构建一个Server 。
public class Server {
public Server() throws HadoopIllegalArgumentException, IOException {
Configuration conf = new Configuration() ;
org.apache.hadoop.ipc.RPC.Server server = new RPC.Builder(conf).
setProtocol(MyProtocol.class).setInstance(new MyProtocolImpl()).
setBindAddress("localhost").setPort(9000).setNumHandlers(5).build() ;
server.start();
}
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
new Server() ;
}
}
- 构建Client 。
public class Client {
public Client() throws IOException {
InetSocketAddress addr = new InetSocketAddress("localhost", 9000) ;
MyProtocol proxy = RPC.getProxy(MyProtocol.class, MyProtocol.versionID, addr,
new Configuration()) ;
proxy.echo() ;
}
public static void main(String args...){
new Client() ;
}
}
在不同的进程中分别启动Server和Client,即可看到输出.
Yarn RPC
Yarn RPC是Hadoop Yarn将原有的序列化部分分隔开,将具体的RPC实现交给RpcEngine接口。如WritableRpcEngine和ProtobufRpcEngine分别采用的是hadoop自带的序列化框架和protobuf序列化框架实现的RPC。
Yarn提供一个对外的抽象类YarnRPC,具体由YarnRPC中的create(conf)方法实现,由参数yarn.ipc.rpc.class决定,默认值是HadoopYarnProtoRPC。
public static final String IPC_RPC_IMPL =
IPC_PREFIX + "rpc.class";
public static final String DEFAULT_IPC_RPC_IMPL =
"org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
...省略...
String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
if (clazzName == null) {
clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
}
try {
return (YarnRPC) Class.forName(clazzName).newInstance();
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
HadoopYarnProtoRPC提供了getProxy()和getServer()方法来生成客户端和服务端。其中都是通过RPC工厂提供器RpcFactoryProvider来生成RpcClientFactory和RpcServerFactory。当然默认的客户端和服务端都是采用protobuf来序列化的,如RpcClientFactoryPBImpl和RpcServerFactoryPBImpl。
public Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf) {
return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
addr, conf);
}
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers, String portRangeConfig) {
return RpcFactoryProvider.getServerFactory(conf).getServer(protocol,
instance, addr, conf, secretManager, numHandlers, portRangeConfig);
}
其中RpcClientFactoryPBImpl这个客户端的工厂类会扫描包中路径impl.pb.client.*PBClientImpl的类,然后通过java的反射来生成类的实例。如client和ResourceManager之间通信的客户端协议ApplicationClientProtocolPBClientImpl类。其中生成客户端的代码为:
public ApplicationClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
ProtobufRpcEngine.class);
proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
}
还是采用了RPC.getProxy方法创建客户端。
同理,RpcServerFactoryPBImpl这个服务端的工厂类会扫描包路径下的impl.pb.service.*PBServiceImpl类,通过反射生成类的实例。如ResourceTrackerPBServiceImpl。然后获取该实例对应的协议类,调用createServer方法生成Server。
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService, String portRangeConfig) throws IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
.setInstance(blockingService).setBindAddress(addr.getHostName())
.setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
.setSecretManager(secretManager).setPortRangeConfig(portRangeConfig)
.build();
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server;
}
也是采用了RPC.Builder.build()方法创建server。
参考
《Hadoop技术内幕深入解析YARN架构设计与实现原理》.董西成。
hadoop官网 http://hadoop.apache.org/docs/r2.6.5/