线程模型
Dubbo默认使用Netty作为其低层的网络通讯框架,提供者服务在启动时会执行NettyServer的doOpen()
方法:
@Override
protected void doOpen() throws Throwable {
//Accept ThreadPool
ExecutorService boss =
Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
//IO ThreadPool
ExecutorService worker =
Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
//创建ServerBootstrap实例
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker,
getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
//......省略部分代码
}
从代码中我们可以看到Netty使用了两级的线程池,分别是EventLoopGroup(boss)
和EventLoopGroup(worker)
,boss线程池主要负责接受并处理客户端的请求操作,接受请求之后将请求分发给worker线程池来处理,worker线程池根据请求不同的事件回调Handler对应的方法。下图是Netty的线程模型:
boss
和worker
的线程组称为IO线程,Handler绑定在IO线程上,如果事件处理的逻辑能在Handler上迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度(线程上下文切换)。但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须让Handler将这些任务派发到业务线程池(子线程),否则Handler绑定的 IO 线程将被长时间阻塞,导致不能及时接受其它请求。
派发策略Dispatcher
所以Dubbo提供了多种策略让用户来决定将什么样的消息派发到业务线程池处理:
-
all
默认方式,所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。 -
direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行。 -
message
只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。 -
execution
只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。 -
connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
配置方式很简单,通过dispatcher
指定派发策略,通过threadpool
和threads
来指定业务线程池:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
业务线程池配置ThreadPool
- fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
- cached 缓存线程池,空闲一分钟自动删除,需要时重建。
- limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
- eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)
源码追踪
默认Dubbo会使用all
策略,将所有的消息都派发到业务线程池处理,我们来看下all
策略实现类AllChannelHandler
的源码:
public class AllChannelHandler extends WrappedChannelHandler {
//......省略部分代码
/**接收到连接*/
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
//交给业务线程池处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}
/**断开连接*/
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
//交给业务线程池处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}
/**接收到数据*/
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
//交给业务线程池处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
/**发生异常*/
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
//交给业务线程池处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
}
}
服务接口多版本
Dubbo提供了可以配置服务接口版本号的功能,使用version
属性配置在<dubbo:service/>
和<dubbo:reference/>
标签上,即可显示指定服务接口版本。一旦配置了版本号之后,消费者服务只能调用具有相同版本的提供者服务。
<dubbo:service interface="xxx.xxx.xx.XXX" version="1.0.0" />
<dubbo:reference interface="xxx.xxx.xx.XXX" version="1.0.0" />
该功能主要是为了解决当一个接口实现方(提供者服务)在代码迭代过程中出现不兼容旧版本的情况时会出现调用失败的问题。我们可以通过使用版本号过渡,逐步将各个服务升级到最新版本。
1、假设现在有四个服务(两个提供者、两个消费者),他们的版本号均是1.0,那么目前消费者服务可以调用到两个提供者服务中的任意一个:
2、提供者服务版本迭代出现接口不兼容后,将版本号修改为2.0,那么此时消费者服务无法调用到该2.0版本的提供者服务:
3、将消费者服务依次全部升级到2.0版本,调用最新版的提供者服务:
4、将剩下的低版本提供者服务都升级到最新版本:
开发阶段直连调试
大型项目在研发阶段通常会有多人同时参与,有时候为了快速联调、提升研发效率,大家都共用一个注册中心。共用一个注册中心意味着同一个服务接口会有多个提供者服务,这时候如果你想本地开发想断点调试,启动消费者服务去调用远程接口,调用的提供者服务就不一定是自己本地机器上的服务,那么如何才能够只调用本地的提供者服务呢?
Dubbo提供了直连提供者服务的功能,直接绕过注册中心,测试指定的提供者服务。这种方式又被称为点对点直连方式,将以服务接口为单位,忽略注册中心的提供者列表。分两步完成:
第一步:处于本地调试阶段的提供者服务接口还不稳定,使用register="false"
标签属性声明不往注册中心注册服务。
<dubbo:registry protocol="zookeeper" address="192.168.0.xxx:2181" register="false"/>
第二步:消费者服务使用url
标签属性指定服务接口直连本地提供者服务。
<dubbo:reference id="xxx" interface="xxx.xxx.Xxx" url="dubbo://localhost:20881">
回声测试
有时候需要根据提供者服务是否可用来做一些特殊的业务处理,dubbo为此提供了回声测试功能,可用于检测提供者服务是否可用。所有服务自动实现 EchoService 接口,只需将任意服务引用强制转型为 EchoService,即可使用。
消费者服务xml配置:
<dubbo:reference id="userService" interface="com.xxx.UserService" />
回声测试代码:
EchoService echoService = (EchoService)context.getBean("userService");
Object status = echoService.$echo("ok");
assert(status.equals("OK"));