1. 前言
我们经常在Java并发框架中用到线程池。关于线程池我们该了解哪些东西的。
- 线程池实现的基本原理
- 常用的线程池的介绍,以及其源码的实现。
2. 线程池的基本原理
其线程池执行execute()方法的过程如下:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务。
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue中,则创建新的线程来处理任务。
- 如果创建新线程使线程数量超过maximumPoolSize,任务将被拒绝。
源代码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//对应步骤1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
还有一个方法是其他线程池经常调用的方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = (System.getSecurityManager() == null)
? null
: AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
上面代码有几个名词需要解释一下:
1.corePoolSize: 核心线程池大小
2.maximumPool: 最大线程池大小
3.workQueue: 用来暂存任务的工作队列
4.RejectedExecutionHandler: 拒绝策略
3. newFixedThreadPool
newFixedThreadPool是可重用的固定线程数的线程池,下面是其源代码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
由上我们可以看出corePoolSize和maximumPool的数值都为nThreads,其工作队列为LinkedBlockingQueue。
其运行步骤如下:
1.如果当前运行的线程数小于corePoolSize的时候,直接创建新线程执行任务。
2.其运行的线程数大于等于corePoolSize的时候,将任务加入LinkedBlockingQueue。
3.线程执行完会反复的从LinkedBlockingQueue中获取任务来执行。
4.newSingleThreadExecutor
newSingleThreadExecutor是一个单线程的线程池,它可以实现按指定的顺序运行线程。其源代码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
由源代码可以看出其corePoolSize和maximumPool的数值都设置为1,工作队列为无界对列LinkedBlockingQueue。
其运行步骤如下:
1.如果当前运行的线程数小于1的时候,直接创建新线程执行任务。
2.其运行的线程数大于等于corePoolSize的时候,将任务加入LinkedBlockingQueue。
3.线程执行完会反复的从LinkedBlockingQueue中获取任务来执行。
5.newCachedThreadPool
newCachedThreadPool是一个根据需要创建线程的一个线程池。其源代码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
由源代码可以看出其corePoolSize设置为0,,maximumPool的数值都设置为2^31-1,其工作队列为SynchronousQueue。
其运行步骤如下:
1.当需要建立新线程的时候,检查是否有空线程,有的话直接使用。没有的话创建新线程执行任务。
2.新创建的任务执行完后,会让空闲线程等待60秒
6. newScheduledThreadPool
newScheduledThreadPool是一个定长的可以周期执行任务的线程池。其源代码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
//ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,所以super是执行
//ThreadPoolExecutor的构造函数。
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
其实现周期执行任务的关键是实用的工作队列为DelayedWorkQueue,它可以以时间为优先级来进行排序。