文章目录
- 一、new Thread的弊端
- 二、 线程池的优势
- 三、ThreadPoolExecutor
- 四、阻塞队列BlockingQueue
- 4.1 BlockingQueue的核心方法
- 4.2 常见BlockingQueue
- 4.2.1 ArrayBlockingQueue有界
- 4.2.2 LinkedBlockingQueue无界
- 4.2.3 DelayQueue无界延时
- 4.2.4 PriorityBlockingQueue
- 4.2.5 SynchronousQueue无元素
- 4.2.6 LinkedBlockingDeque
- 4.3 阻塞队列的实现原理
- 五、Java提供的线程池类
- 六、源码分析
系统启动一个新线程的成本是比较高的,因为它涉及到与操作系统的交互。在这种情况下,使用线程池可以很好的提供性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象传给线程池,线程池就会启动一条线程来执行该对象的run方法,当run方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run方法。
除此之外,使用线程池可以有效地控制系统中并发线程的数量,但系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。而线程池的最大线程数参数可以控制系统中并发的线程不超过此数目。
在JDK1.5之前,开发者必须手动的实现自己的线程池,从JDK1.5之后,Java内建支持线程池。
与多线程并发的所有支持的类都在java.lang.concurrent包中。我们可以使用里面的类更加的控制多线程的执行。
一、new Thread的弊端
-
每次new Thread新建对象,性能差
-
线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者OOM(OutOfMemory)
-
缺少更多功能,如更多执行、定期执行、线程中断
二、 线程池的优势
- 重用存在的线程,减少对象创建、消亡的开销,性能好
- 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
- 提供定时执行、定期执行、单线程、并发数控制等功能
三、ThreadPoolExecutor
ThreadPoolExecutor是线程池的核心实现类,我们来看下他的构造方法
ThreadPoolExecutor(int corePoolSize
, int maximumPoolSize, long keepAliveTime
, TimeUnit unit
, BlockingQueue<Runnable> workQueue
, ThreadFactory threadFactory
, RejectedExecutionHandler handler)
- corePoolSize :核心线程数量
- 默认情况下(可预创建线程)线程池后线程池中的线程数为0,当有任务提交时才会创建线程;
- 如果当前运行的线程数小于 corePoolSize, 则直接创建一个新线程来运行任务;
- 如果多于或者等于 corePoolSize, 则不再创建;执行后续步骤
- 如果调用 prestartAllcoreThread方法,线程池会提前创建并启动所有的核心线程来等待任务
- 如果调用public boolean prestartCoreThread(),线程池会提前创建并启动一个核心线程来等待任务
- 默认情况下(可预创建线程)线程池后线程池中的线程数为0,当有任务提交时才会创建线程;
- workQueue :阻塞任务队列,存储等待执行的任务
- 如果当前线程数大于corePoolSize,则将任务添加到该阻塞队列;
- BlockingQueue只是一个接口,它所表达的是当队列为空或者已满的时候,需要阻塞以等待生产者/消费者协同操作并唤醒线程。其有很多不同的具体实现类,各有特点。有的可以规定队列的长度,也有一些则是无界的。
- 有三种取值,ArrayBlockQueue(基于数组的先进先出队列,创建时必须指定大小)、LinkedBlockingQueue(基于链表的先进先出队列,如果没有指定此队列大小,默认为Integer.MAX_VALUE)、SynchronousQueue(不会保存提交的任务,直接新建一个线程来执行新的任务)
- ArrayBlockingQueue :可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
- LinkedBlockingQueue:
- 这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。
- 由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
- SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
- DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
- maximumPoolSize :线程允许创建的最大线程数
- 如果任务队列满了并且线程数小于 maximumPoolSize 则线程池仍旧会创建新的线程来完成任务
- keepAliveTime :非核心线程闲置的超时时间,超过时回收该线程
- 默认情况只有当线程池中的线程数大于corePoolSize时,keepAliveTIme才会起作用。
- 当线程池中的线程数大于corePoolSize,如果一个线程的空闲时间达到keepAliveTime,则会被终止
- 如果任务很多,并且每个任务的执行时间很短,则可以调大 keepAliveTime来提高非核心线程的存活时间来提高利用率
- 如果调用allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时keepAliveTime参数也会起作用,直到线程池的线程数为0
- unit:keepAliveTime的时间单位,有7中取值,如:TimeUnit.DAYS; 天,可具体到纳秒
- threadFactory:线程工厂,用来创建线程
- 可以用线程工厂给每个创建出来的线程设置名字
- rejectHandler:饱和策略,当拒绝处理任务时的策略
- 当任务队列和线程池个数都满了时候,采取的策略,通常有四种取值
- ThreadPoolExecutor.AbortPolicy:表示无法处理新任务,丢弃任务并抛出RejectedExecutionException异常;
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常;
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,重新尝试执行任务(重复此过程);
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
线程池的对新任务的处理流程可以如下图所示:
【线程池的对新任务的处理流程】
- 如果当前poolsize小于corePoolSize,创建新线程执行任务
- 如果当前poolsize大于corePoolsize,且等待队列未满,进入等待队列
- 如果当前poolsize大于corePoolsize且小于maximumPoolSize,且等待队列已满,创建新线程执行任务
- 如果当前poolsize大于corePoolSize且大于maximumPoolSize,且等待队列已满则用拒绝策略来处理该任务
- 线程池中的线程执行完任务后不会立刻退出,而是去检查等待队列是否有新的线程去执行,如果在keepAliveTime里等不到新任务,线程就会退出
3.1 ThreadPoolExecutor方法
- execute():提交任务,交给线程池执行
- submit():提交任务,能够返回执行结果 execute+Future
- shutdown():关闭线程池,等待任务都执行完
- shutdownNow():关闭线程池,不等待任务执行完
- getTaskCount():线程池已执行的和未执行的任务总数
- getCompletedTaskCount():已完成的任务数量
- getPoolSize():线程池当前线程数量
- getActiveCount():当前线程池正在执行任务的线程数量
3.2 线程池的种类
通过直接或着间接的配置ThreadPoolExecutor的参数可以创建不同的线程池对象,Java通过Executors(一个工具类,类似于TextUtils)提供了四种线程池:
3.2.1 FixedThreadPool 可重用固定线程数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize 和 maximumPoolSize都被设置成指定的参数nThreads,这就意味着 FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程
- keepAliveTime设置为0意味着多余线程会被立即终止。
- 因为不会有多余线程,其实这个参数没啥用
- 阻塞队列采用了 LinkedBlockingQueue 无界阻塞队列
- 由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
【图】
当执行execute()方法时:
- 如果当前运行的线程数未达到 corePoolSize时,就创建新的线程来处理任务
- 否则,就将任务添加到 LinkedBlockingQueue。
- 当线程池有空闲线程时,则从任务队列中取任务执行
示例:
publicclass TestFixedThreadPool {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
3.2.2 CachedThreadPool 无限线程数
每次提交的任务都会立即被执行
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- corePoolSize为 0 和 maximumPoolSize被设置成Max,这就意味着 CachedThreadPool 没有核心线程,非核心线程的数量无限大
- keepAliveTime设置为60L意味着 空闲的线程最多等待60s
- 阻塞队列采用了 SynchronousQueue 不存储元素的阻塞队列,每一个插入操作必须等待另一个线程的移除操作
【图】
当执行execute()方法时:
- 执行 SynchronousQueue#offer 来提交任务,并查询是否有空闲线程执行 poll来移除任务
- 有则交给该线程执行
- 无则创建一个新线程执行
- 线程池的线程空闲时,会执行 SynchronousQueue#pool, 阻塞式等待新的提交
- 如果超过60s无新任务,则关闭该线程
示例:
publicclass TestCachedThreadPool {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newCachedThreadPool();
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果:
pool-1-thread-2正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-5正在执行。。。
3.1.3 SingleThreadExecutor 单线程化
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- corePoolSize 和 maximumPoolSize都被设置成1,这就意味着 SingleThreadExecutor 只有1个核心线程,没有非核心线程
- keepAliveTime设置为0意味着多余线程会被立即终止。
- 因为不会有多余线程,其实这个参数没啥用
- 阻塞队列采用了 LinkedBlockingQueue 无界阻塞队列
- 由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
【图】
当执行execute()方法时:
- 如果当前运行的线程数未达到 corePoolSize时,也就是没有一个,就创建新的线程来处理任务
- 否则,就将任务添加到 LinkedBlockingQueue。
- 当线程池有空闲线程时,则从任务队列中取任务执行
示例:
publicclassMyThread extends Thread {
@Override
publicvoid run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
publicclassTestSingleThreadExecutor {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors. newSingleThreadExecutor();
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
3.2.4 ScheduledThreadPool 定时周期性任务
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
- corePoolSize为 固定值 和 maximumPoolSize被设置成Max,这就意味着 CachedThreadPool 有固定个核心线程,非核心线程的数量无限大
- keepAliveTime设置为60L意味着 空闲的线程最多等待DEFAULT_KEEPALIVE_MILLIS
- 阻塞队列采用了 DelayedWorkQueue 一个支持延时获取元素的无界队列。 创建元素式,可以指定元素的到达时间,只有到期才能被取走
【图】
示例:
publicclass TestScheduledThreadPoolExecutor {
publicstaticvoid main(String[] args) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异常
@Override
publicvoid run() {
//throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印系统时间,证明两者是互不影响的
@Override
publicvoid run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
================
8384644549516
8386643829034
8388643830710
================
8390643851383
8392643879319
8400643939383
四、阻塞队列BlockingQueue
阻塞队列常用于生产者和消费者场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程
- 当队列没有数据时,消费者端的所有线程会被自动阻塞(挂起),直到有数据放入队列
- 当队列数据满额时,生产者端的所有线程会被自动阻塞(挂起),直到有数据被取出队列
阻塞队列其实就是一个容器,盛放了这些元素,但是提供了一些特殊的API去访问这个容器,譬如实现阻塞
这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。
4.1 BlockingQueue的核心方法
- 放入数据
- offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
- offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
- put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
- 获取数据
- poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
- poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
- take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
- drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
4.2 常见BlockingQueue
【图】
4.2.1 ArrayBlockingQueue有界
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。
而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
4.2.2 LinkedBlockingQueue无界
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)
当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理
而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
4.2.3 DelayQueue无界延时
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
4.2.4 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
4.2.5 SynchronousQueue无元素
- 是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
- 一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
4.2.6 LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。
双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法
以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。
另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。
4.3 阻塞队列的实现原理
其实阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制
下面是Jdk 1.7中ArrayBlockingQueue部分代码
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//创建数组
this.items = new Object[capacity];
//创建锁和阻塞条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//添加元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果队列不满就入队
enqueue(e);
} finally {
lock.unlock();
}
}
//入队的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出队的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
五、Java提供的线程池类
【图Java提供的线程池类】
- Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
- ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
- 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
- ThreadPoolExecutor继承了类AbstractExecutorService。
- execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
- submit()在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果
- 去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
- shutdown()
- shutdownNow()
5.1 Executors类
JDK1.5中提供Executors工厂类来产生连接池,该工厂类中包含如下的几个静态工程方法来创建连接池:
- public static ExecutorService newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。
- public static ExecutorService newSingleThreadExecutor():创建一个只有单线程的线程池,它相当于newFixedThreadPool方法是传入的参数为1
- public static ExecutorService newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
- public static ScheduledExecutorService newSingleThreadScheduledExecutor:创建只有一条线程的线程池,他可以在指定延迟后执行线程任务
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以再指定延迟后执行线程任务,corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内。
上面的几个方法都有一个重载的方法,多传入一个ThreadFactory参数的重载方法,使用的比较少。
5.2 ExecutorService类
可以看到上面的5个方法中,前面3个方法的返回值都是一个ExecutorService对象。该ExecutorService对象就代表着一个尽快执行线程的线程池(只要线程池中有空闲线程立即执行线程任务),程序只要将一个Runnable对象或Callable对象提交给该线程池即可,该线程就会尽快的执行该任务。
ExecutorService有几个重要的方法:
- boolean isShutdown()
- 如果此执行程序已关闭,则返回 true。
- boolean isTerminated()
- 如果关闭后所有任务都已完成,则返回 true。
- void shutdown()
- 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
- List shutdownNow()
- 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
- Future submit(Callable task)
- 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
- Future<?> submit(Runnable task)
- 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
- Future submit(Runnable task, T result)
- 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
ScheduleExecutorService类是ExecutorService类的子类。所以,它里面也有直接提交任务的submit方法,并且新增了一些延迟任务处理的方法:
- ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)
- 创建并执行在给定延迟后启用的 ScheduledFuture。
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- 创建并执行在给定延迟后启用的一次性操作。
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
六、源码分析
下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
1.线程池状态
2.任务的执行
3.线程池中的线程初始化
4.任务缓存队列及排队策略
5.任务拒绝策略
6.线程池的关闭
7.线程池容量的动态调整
6.1 线程池状态
在 ThreadPoolExecutor 中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
- 当创建线程池后,初始时,线程池处于RUNNING状态;
- 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
- 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
- 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
6.2 任务的执行
6.2.1 重要成员变量
一个线程池包括以下四个基本组成部分:
- 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
- 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
- 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile long keepAliveTime; //线程存活时间
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
private volatile int poolSize; //线程池中当前的线程数
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
Worker工作线程
它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )
封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。
privatefinal class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();// allow interrupts
booleancompletedAbruptly = true;
try{
while(task != null|| (task = getTask()) != null) {//是否是第一次执行任务,或者从队列中可以获取到任务。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try{
beforeExecute(wt, task);//获取到任务后,执行任务开始前操作钩子。
Throwable thrown = null;
try{
task.run();//执行任务。
}catch(RuntimeException x) {
thrown = x; throwx;
}catch(Error x) {
thrown = x; throwx;
}catch(Throwable x) {
thrown = x; thrownew Error(x);
}finally{
afterExecute(task, thrown);//执行任务后钩子。
}
}finally{
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
}finally{
processWorkerExit(w, completedAbruptly);
}
}
}
这两个钩子(beforeExecute,afterExecute)允许我们自己继承线程池,做任务执行前后处理。
runWorker这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:
- 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
- 在任务执行前后,执行beforeExecute()和afterExecute()方法
- 记录任务执行中的异常后,继续抛出
- 每个任务完成后,会记录当前线程完成的任务数
- 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
- 线程退出,执行processWorkerExit(w, completedAbruptly)处理
Worker线程的复用和任务的获取getTask()
在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
private Runnable getTask() {
booleantimedOut = false;// Did the last poll() time out?
retry:
for(;;) {
intc = ctl.get();
intrs = runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
returnnull;
}
booleantimed; // Are workers subject to culling?
for(;;) {
intwc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if(wc <= maximumPoolSize && ! (timedOut && timed))
break;
if(compareAndDecrementWorkerCount(c))
returnnull;
c = ctl.get();
// Re-read ctl
if(runStateOf(c) != rs)
continueretry;
// else CAS failed due to workerCount change; retry inner loop
}
try{
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r != null)
returnr;
timedOut = true;
}catch(InterruptedException retry) {
timedOut = false;
}
}
}
}
getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞
- 另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。
- 这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。
processWorkerExit线程池线程数的维护和线程的退出处理
这个方法最主要就是从workers的Set中remove掉一个多余的线程。
private void processWorkerExit(Worker w, booleancompletedAbruptly) {
if(completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
finalReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
completedTaskCount += w.completedTasks;
workers.remove(w);
}finally{
mainLock.unlock();
}
tryTerminate();
intc = ctl.get();
if(runStateLessThan(c, STOP)) {
if(!completedAbruptly) {
intmin = allowCoreThreadTimeOut ? 0: corePoolSize;
if(min == 0&& ! workQueue.isEmpty())
min = 1;
if(workerCountOf(c) >= min)
return;// replacement not needed
}
addWorker(null,false);
}
}
这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。
之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。
执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。
最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。
6.2.2 execute()方法
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) {
if(command == null)
thrownew NullPointerException();
intc = ctl.get();
if(workerCountOf(c) < corePoolSize) {//1
if(addWorker(command, true))
return;
c = ctl.get();
}
if(isRunning(c) && workQueue.offer(command)) {//2
intrecheck = ctl.get();
if(! isRunning(recheck) && remove(command))
reject(command);
elseif (workerCountOf(recheck) == 0)
addWorker(null,false);
}
elseif (!addWorker(command, false))//3
reject(command);
- workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;
- 判断线程池是否在运行,如果在,任务队列是否允许插入,插入成功再次验证线程池是否运行,如果不在运行,移除插入的任务,然后抛出拒绝策略。如果在运行,没有线程了,就启用一个线程。
- 如果添加非核心线程失败,就直接拒绝了。
【图execute()方法】
addWorker()的实现
在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。
private boolean addWorker(Runnable firstTask, booleancore) {
retry:
for(;;) {
intc = ctl.get();
intrs = runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&
! workQueue.isEmpty()))
returnfalse;
for(;;) {
intwc = workerCountOf(c);
if(wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
returnfalse;
if(compareAndIncrementWorkerCount(c))
breakretry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) != rs)
continueretry;
// else CAS failed due to workerCount change; retry inner loop
}
}
booleanworkerStarted = false;
booleanworkerAdded = false;
Worker w = null;
try{
finalReentrantLock mainLock = this.mainLock;
w = newWorker(firstTask);
finalThread t = w.thread;
if(t != null) {
mainLock.lock();
try{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
intc = ctl.get();
intrs = runStateOf(c);
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if(t.isAlive()) // precheck that t is startable
thrownew IllegalThreadStateException();
workers.add(w);
ints = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
}finally{
mainLock.unlock();
}
if(workerAdded) {
t.start();
workerStarted = true;
}
}
}finally{
if(! workerStarted)
addWorkerFailed(w);
}
returnworkerStarted;
}
第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。
第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。