什么是线程池
线程池是一种多线程处理的形式,通过把处理的任务添加到队列中,然后在创建线程后自动执行这些任务。线程池可以同时执行多个任务,如果任务队列已经满了,则新来的任务就会排队等待,线程池线程的数量永远不会大于既定最大值。
Exectors API
Java类库提供了4个静态方法来创建一个线程池:
newFixedThreadPool 创建一个固定长度的线程池,当到达线程最大数量时,线程池的规模将不再变化。
newCachedThreadPool 创建一个可缓存的线程池,如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模无限制。
newSingleThreadPoolExecutor 创建一个单线程的Executor,确保任务对了,串行执行
newScheduledThreadPool 创建一个固定长度的线程池,而且以延迟或者定时的方式来执行,类似Timer;
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
几种不同的ExecutorService线程池对象
newCachedThreadPool()
-
缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中
-
缓存型池子通常用于执行一些生存期很短的异步型任务
-
因此在一些面向连接的daemon型SERVER中用得不多。
newFixedThreadPool()
-
newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
-
其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
ScheduledThreadPool()
-
调度型线程池
-
这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor
-
单例线程,任意时间池中只能有一个线程
-
用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
____________________________________________________________________________
线程池的数据结构 核心类ThreadPoolExecutor
worker*:工作类,一个worker代表启动了一个线程,它启动后会循环执行workQueue里面的所有任务
workQueue:任务队列,用于存放待执行的任务
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
线程池原理:预先启动一些线程,线程无限循环从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已。如此反复。
java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类, ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器。
public ThreadPoolExecutor(int corePoolSize, //核心池的大小
int maximumPoolSize, //线程池中可以容纳的最大线程的数量
//线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使
//在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间
long keepAliveTime,
TimeUnit unit, //就是计算最长的空闲时间的一个单位,
BlockingQueue<Runnable> workQueue, //就是等待队列,任务可以储存在任务队列中等待被
//执行,执行的是FIFIO原则(先进先出)
ThreadFactory threadFactory, //就是创建线程的线程工厂
RejectedExecutionHandler handler) //拒绝处理任务时的策略
我们可以看出,线程池中
corePoolSize就是线程池中的核心线程数量,这几个核心线程,只是在没有用的时候,也不会被回收
maximumPoolSize就是线程池中可以容纳的最大线程的数量
keepAliveTime,就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清 除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,
util,就是计算这个时间的一个单位。
workQueue,就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。
threadFactory,就是创建线程的线程工厂。
handler,是一种拒绝策略,我们可以在任务满了之后,拒绝执行某些任务。
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建
corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里阻塞队列有以 下几种选择:
threadFactory:线程工厂,主要用来创建线程;
handler:拒绝处理任务时的策略
____________________________________________________________________________
____________________________________________________________________________
步骤一(对应图1号箭头,对应源码第一个if代码块): 线程池中线程数小于corePoolSize ,就会创建新线程。
步骤二(对应图2号箭头,对应第二个if代码块): 线程池中线程数等于corePoolSize,这个任务就会保存到BlockingQueue。
步骤三(对应图3号箭头,对应第三个if代码块): BlockingQueue也满了,线程池中线程数小于 maximumPoolSize时候就会再次创建新的线程。
步骤四(对应图4号箭头,对应第三个if代码块): BlockingQueue满了, 线程池中线程数等于 maximumPoolSize时候就会执行饱和策略。
____________________________________________________________________________
看一下任务从提交到最终执行完毕经历了哪些过程。*
在ThreadPoolExecutor类中,
最核心的任务提交方法是execute()方法**,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) {
if (command == null) //判断提交的任务command是否为null
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果线程池中当前线程数不小于核心池大小则继续
//或执行完addIfUnderCorePoolSize这个方法返回false
if (runState == RUNNING && workQueue.offer(command)) {
//如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列
if (runState != RUNNING || poolSize == 0)
//防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭 //了线程池的一种应急措施。如果是这样就执行:
ensureQueuedTaskHandled(command);
ensureQueuedTaskHandled(command);
//从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。
}
else if (!addIfUnderMaximumPoolSize(command))
//如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行addIfUnder....
//如果执行addIfUnder....方法失败,则执行reject()方法进行任务拒绝处理。
reject(command); // is shutdown or saturated
}
}
到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,
下面总结一下:
1)首先,要清楚corePoolSize*和maximumPoolSize*的含义;
2)其次,要知道Worker*是用来起到什么作用的;(工作类,一个worker代表启动了一个线程,它启动后会循环执行workQueue里面的所有任务)
3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
-
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
-
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
-
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
-
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
线程池任务submit及执行流程
线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
-
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
-
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
-
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
-
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
线程的销毁
keepAliveTime:代表的就是线程空闲后多久后销毁,线程的销毁是通过worker的getTask()来实现的。
一般来说,Worker会循环获取getTask(),如果getTask()返回null则工作线程worker终结
线程池的关闭
-
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
-
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
-
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
____________________________________________________________________________
线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
-
setCorePoolSize:设置核心池大小
-
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
如何合理配置线程池的大小
一般需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
缓存线程池的参数如何设置
参数的设置跟系统的负载有直接的关系,系统负载相关参数:
tasks,每秒需要处理的最大任务数
tasktime,处理每个任务所需要的时间
responsetime,系统允许任务最大的响应时间,比如每个任务的响应时间不得超过2秒
corePoolSize:
每个任务需要tasktime秒处理,则每个线程每秒可以处理1/tasktime个任务。系统每秒有tasks个任务,所以需要的线程数为tasks/(1/tasktime),即tasks*tasktime个线程数。
假设系统每秒任务数为100-1000,每个任务耗时0.1秒,那么需要10-100个线程, 核心线程数应设置为大于10,具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%情况下每秒任务数小于200,最多时为1000,那么可以设置为20。
maxPoolSize:
当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数等于最大线程数,则已经超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
当系统负载达到最大值的时候,核心线程数已经无法按时处理完所有任务,这时候就需要增加线程。每秒200个任务需要20个线程,当每秒达到1000个任务的时候,需要(1000-queueCapacity)*(20/200)=60个线程,可以将最大线程数设置为60。
queueCapacity:
任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。
任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。队列长度可以设置为:
(corePoolSize/tasktime)*responsetime:20/0.1*2=400,即队列长度可以设置为400。
设计一个线程池
____________________________________________________________________________
好处 : 线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:
1、降低资源消耗;
2、提高响应速度;
3、提高线程的可管理性。
Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。
线程池的优点是可总结为以下三点:
-
线程复用
-
控制最大并发数
-
管理线程
1.线程复用:
在ThreadPoolExecutor主要Worker类来控制线程的复用。Worker是一个Runnable,同时拥有一个thread,这个thread就是要开启的线程,在新建Worker对象时同时新建一个Thread对象,同时将Worker自己作为参数传入TThread,这样当Thread的start()方法调用时,运行的实际上是Worker的run()方法,接着到runWorker()中,有个while循环,一直从getTask()里得到Runnable对象,顺序执行。
2. 控制最大并发数:
根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可用下图表示。
3.管理线程:
在ThreadPoolExecutor有个ctl的AtomicInteger变量。通过这一个变量保存了两个内容:
-
所有线程的数量
-
每个线程所处的状态
其中低29位存线程数,高3位存runState,通过位运算来得到不同的值。
线程池的工作过程如下:
1.线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2.当调用 execute() 方法添加一个任务时,线程池会做如下判断:
-
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
-
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
-
如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
-
如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
3.当一个线程完成任务时,它会从队列中取下一个任务来执行。
4.当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务 完成后,它最终会收缩到 corePoolSize 的大小。