注:本文基于dubbo版本v2.6.1
1.介绍
当我们在使用dubbo的时候,是可以通过调整线程池来达到调优的效果,我们可以在<dubbo:protocol>
标签中使用用threadpool属性选择自己想要使用的线程池,通过threads属性配置服务线程数,queues属性配置使用的队列。例如:
<dubbo:protocol name="dubbo" threadpool="fixed" queues="100" threads="100"/>
具体其他配置属性请参考:官方文档
dubbo线程在dubbo-common模块的threadpool包下面,我们可以看一下它的包结构
接下来我们就一起看下具体源码
2.ThreadPool接口
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
我们可以看到ThreadPool接口是个扩展点,然后默认实现是fixed,然后里面有个getExecutor方法,被@Adaptive注解修饰。在dubbo中ThreadPool有4个实现类,分别是:
- CachedThreadPool 缓存线程池,超过keepAliveTime时间删除,使用的时候再创建
- FixedThreadPool 固定线程数量线程池,一旦建立,一直持有。
- LimitedThreadPool 可伸缩线程池,线程只增长不收缩。
- EagerThreadPool 当core线程数忙的时候,创建新线程,而不是将任务放入阻塞队列。这个使用自己队列TaskQueue。
3.CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
//带有keepalive的
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心线程数,默认是0;
- maxThread:最大线程数,默认是Integer.MAX_VALUE,可以看作是无限大。
- queues:如果queues=0,使用SynchronousQueue,如果是小于0,就是个new LinkedBlockingQueue 队列,这个队列大小是Integer.MAX_VALUE,这个查看LinkedBlockingQueue源码可以看到。如果是queues大于0 ,就创建queues大小的LinkedBlockingQueue,默认是0 。
- keepalive:最大空闲时间,默认是 60 * 1000ms ,也就是1分钟。
我们可以看到CachedThreadPool使用默认参数的话,就会无限创建线程,然后超过空闲时间,线程就会被销毁,然后再使用的时候,就会再创建。
4.FixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
*
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
*/
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 默认是200个线程
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心线程数,默认是200
- maxThreads: 最大线程数。默认是200
- queues:当queues<0,使用一个无限大的LinkedBlockingQueue队列,当queues>=0 的时候创建queues大小的LinkedBlockingQueue。默认是0,也是创建0大小的LinkedBlockingQueue队列。
- keepalive:直接就是0,表示不销毁。
我们可以看出FixedThreadPool 创建固定大小的线程池,默认是200,期间不会销毁,使用了FixedThreadPool线程池,keepalive配置也没有作用。
5.LimitedThreadPool
/**
* Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
* automatically.
*
*
* 可以控制基本线程数 与 最大线程数的
*/
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心线程数,默认是0
- maxThreads: 最大线程数,默认是200
- queues:当queues=0的时候使用SynchronousQueue,当queues < 0的时候创建一个无限大小LinkedBlockingQueue队列,当queues>0的时候,创建queues大小的LinkedBlockingQueue队列。默认是0
- keepalive:直接就是Long.MAX_VALUE。
我们可以看到keepalive直接是最大的,也就是线程可以增大,但是不会收缩,原因是防止大流量请求过来,还得现创建线程。
6.EagerThreadPool
/**
* EagerThreadPool
* When the core threads are all in busy,
* create new thread instead of putting task into blocking queue.
*/
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
我们可以上面的注释,当core线程都很忙的时候,创建新线程,而不是将任务放入阻塞队列。
EagerThreadPool与上面几个线程池不一样的地方就是使用了自定义的EagerThreadPoolExecutor与自定义的taskQueue。我们先来看下线程池参数。
- core:核心线程数,默认0
- maxThreads:最大线程数,默认是Integer.MAX_VALUE
- queue: 当queue>0的时候创建大小queue的TaskQueue,queue<=0的时候,就是0大小TaskQueue。默认0
- keepalive:空闲时间,默认60 * 1000ms,也就是一分钟。
我们来看下TaskQueue队列源码
/**
* TaskQueue in the EagerThreadPoolExecutor
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
我们可以看到TaskQueue 继承LinkedBlockingQueue队列,然后重写了offer方法。我们看下这个方法,首先判断EagerThreadPoolExecutor对象是否存在,然后判断线程池当前的任务小于线程池大小,就说明空闲等待的线程,这时候,将任务放入队列,然后让线程去处理。如果当前的任务有很多,这时候判断当前线程数小于最大线程数的时候让线程池去创建线程,这些条件都不满足的时候才往队列里扔。
我们可以缕缕这个流程,当core有闲着的线程的时候,扔队列中让空闲线程处理,没有空闲线程的时候先创建线程,直到线程数到达最大线程数,这时候才会往队列里面扔。我们在源码中看到getSubmittedTaskCount这个方法,这个方法其实dubbo自定义实现ThreadPoolExecutor来维护的一个计数器。我们可以看下EagerThreadPoolExecutor的源码。
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
}
}
}
我们可以看到这个类里面维护了一个AtomicInteger类型的计数器。并重写了execute方法跟afterExecute方法。当来一个任务的时候,计数器先自增长1,然后任务交给父类处理,处理完会调用afterExecute方法,计数器自减1。父类拒绝的时候,重新往队列里offer,没成功的话计数器自减1,并抛出拒绝策略。中断异常的时候也是减1,抛出拒绝策略。父类抛出其他异常的时候也都是减1。