1. 前言
前面的文章分析了Dubbo Provider是如何处理RPC调用请求的,整个处理链路是清晰了,但是关于线程模型却一笔带过,Dispatcher也只是简单介绍了一下,本篇文章会全面分析Provider线程模型。
Dubbo线程可以分为两大类,一类是用于处理底层网络通信的IO线程,一类是处理业务逻辑的业务线程,也可称作Dubbo线程。IO线程以Netty为例,又细分为Boss和Worker,Boss专门用于处理IO连接,Worker用于处理IO读写。
本文会从NettyServer的创建开始说起,先看看线程是如何创建并分工的。再聊聊Dispatcher,看看Dubbo是如何将消息派发到具体线程池的。
2. IO线程
之前的文章已经分析过,Provider在暴露服务的时候会创建NettyServer,在它的构造函数里就会自动创建IO线程和业务线程,我们先看IO线程的创建过程。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
复制代码
NettyServer会调用父类构造函数,父类会从Url里解析出host和ip构建InetSocketAddress对象,然后调用子类的doOpen()
创建并启动服务端。
NettyServer以Netty作为网络传输层框架,那自然是创建ServerBootstrap了。熟悉Netty的同学应该知道,ServerBootstrap需要绑定两个EventLoopGroup,分别是BossGroup和WorkerGroup,它俩本质上就是两个线程池,前者专门用于处理IO连接,再将连接分发给后者,由后者处理IO读写。 因为采用了Nio+多路复用技术,处理IO连接一个线程就足够了。而对于IO读写,往往需要更多的线程,线程数可以通过参数iothreads
进行设置,默认取CPU核心数+1,但最多不会超过32。
bootstrap = new ServerBootstrap();
// 1 bossThread
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
// workerThreads CPU核心数+1 最大不会超过32
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
bootstrap.group(bossGroup, workerGroup);
复制代码
至此,IO线程就创建完毕了,它们的职责有处理客户端的连接、网络IO的数据读写、消息的编解码、心跳包处理、甚至是Body的反序列化等等。一般来说,针对很快就可以处理完的纯内存操作,可以优先考虑在IO线程执行,这样可以避免线程调度带来的额外开销。但是,对于需要查询数据库等耗时的操作,请派发到业务线程池处理,IO线程是非常宝贵的资源,线程数本来就不多,一旦IO线程发生阻塞,线程被很快被占满,导致新的请求不能被处理。
3. 业务线程
一般来说,你写的Service逻辑,就是在业务线程上执行的。业务线程池也是在NettyServer构造函数里被创建的,代码在父类AbstractServer的构造函数里。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
doOpen();
executor = executorRepository.createExecutorIfAbsent(url);
}
复制代码
线程池的创建依赖于ExecutorRepository接口,通过SPI加载,目前只有一个默认实现DefaultExecutorRepository。创建的线程池会缓存到Map容器中,采用双层嵌套Map结构,外层Key代表线程池服务于Consumer还是Provider,因为一个应用可能即使服务提供者也是消费者;内层Key是端口Port,同一个端口暴露的服务共用一个线程池。
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
复制代码
createExecutor()
方法用户创建线程池,它依赖ThreadPool接口,通过SPI的方式自适应创建。
private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
.getAdaptiveExtension().getExecutor(url);
}
复制代码
Dubbo目前提供了四种线程池,Provider默认fixed,Consumer默认cached。
线程池类型 | 说明 |
---|---|
cached | 缓存线程池,默认1分钟 |
fixed | 固定大小线程池 |
limited | 线程数只会增长,不会收缩,避免突发流量引起性能问题。 |
eager | 优先创建线程处理任务,而不是将任务放到队列 |
Dubbo提供了几个可配置的参数:
参数 | 说明 |
---|---|
threadname | 线程名称前缀 |
corethreads | 核心线程数 |
threads | 最大线程数 |
queues | 任务队列大小 |
alive | 线程保持活跃的时间,默认1分钟 |
我们以FixedThreadPool为例,默认的线程数是200,它会创建核心线程数和最大线程数都为200的线程池,线程不会被销毁,且任务队列的长度为0,这意味着请求不会被堆积,并发的上限就是200。
public Executor getExecutor(URL url) {
// 线程名称前缀
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
// 线程数
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
// 队列大小,默认0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
// 线程空闲不会被销毁
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
复制代码
可以通过参数threadname
设置线程名称前缀,默认是DubboServerHandler
,然后再拼上Address加序号。如果你要观察业务线程,只需要留意下面这种格式的线程即可。
DubboServerHandler-192.168.206.142:20880-thread-1
DubboServerHandler-192.168.206.142:20880-thread-2
复制代码
至此,业务线程也创建完毕了,它们的职责有Body反序列化、业务逻辑的处理等。方法的返回值最终会被封装为Response,然后由IO线程发送给Consumer。
4. Dispatcher
IO线程和业务线程都创建好了,那么Dubbo针对请求,什么时候应该让哪类线程处理呢?这就是Dubbo的线程池派发器Dispatcher的职责了。 Dispatcher接口定义很简单,它本身并不具备线程派发的能力,而是通过SPI自适应的方式,加载具有线程派发能力的ChannelHandler,再交给它去处理。
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
复制代码
Dubbo提供了五种线程派发策略,默认策略是all,即所有消息都会被派发到业务线程池执行。
策略 | 说明 |
---|---|
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,其它消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
这里以AllChannelHandler为例,它对于所有的事件,都会创建ChannelEventRunnable,通过异步任务的方式提交到业务线程池执行,IO线程到此就结束了它的工作。以下是received()
方法代码:
public void received(Channel channel, Object message) throws RemotingException {
// 获取业务线程池
ExecutorService executor = getPreferredExecutorService(message);
try {
// 提交异步任务,处理消息 IO线程到此结束
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
复制代码
之前的文章有分析过服务端ChannelHandler的包装过程,如下所示:
AbstractPeer#received
>>MultiMessageHandler#received
>>>>HeartbeatHandler#received
>>>>>>AllChannelHandler#received
>>>>>>>>DecodeHandler#received
>>>>>>>>>>HeaderExchangeHandler#received
>>>>>>>>>>>>DubboProtocol.requestHandler#reply
复制代码
通过今天的分析,我们已经知道,在AllChannelHandler之前,消息是在IO线程上处理的,之后是在业务线程上处理的。
5. 总结
Dubbo线程可以分为IO线程和业务线程两大类,IO线程主要负责连接的建立、网络IO读写、消息编解码、处理心跳,业务线程主要是负责Body反序列化、Service业务逻辑的执行。 Provider服务暴露的时候会创建NettyServer,服务端启动时,会创建ServerBootstrap并绑定bossGroup和workerGroup,这里会创建1个Boss线程,N个Worker线程用于处理IO事件。同时还会创建业务线程池ExecutorService,默认会创建200个线程数的Fixed线程池,且任务队列长度为0,这意味着Provider不具备请求堆积的能力,Dubbo希望此时应该立即失败,让Consumer重试其它节点。 线程派发是通过Dispatcher接口,它通过SPI的方式加载具备线程派发能力的ChannelHandler,ChannelHandler会根据派发策略选择是在IO线程上执行,还是提交到业务线程池执行。