ThreadPoolExecutor实现原理

线程池的基本原理

线程池对任务的基本处理流程
我们先来看看线程池对线程的处理流程,如下图所示:
 
 
execute()执行示意图
execute()方法执行示意图:
 
 
线程池核心源码分析
构造方法分析
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
 
corePoolSize 线程池中恒定保持的线程数目(核心线程数),即使它们是空闲的线程池也会保持,除非设置了allowCoreThreadTimeOut参数为true。
maximumPoolSize 线程池中可以允许的最大线程数量。
keepAliveTime 当线程数大于核心线程数时,这个时间就是那些空闲线程在终止前能够等待新任务到来的最长时间。
unit keepAliveTime参数的单拉。
workQueue 这个阻塞队列用于存储通过execute()方法提交的即将被执行的实现了Runnable接口的任务。
threadFactory 线程池用来创建线程的工厂。
handler 当线程池与任务队列都满了,这个handler用来执行拒绝策略。
 
 
其它属性分析及拒绝策略
其它属性
 
largestPoolSize 线程池曾经达到过的最大线程数目,如果该值等于线程池的最大大小,则表明线程池曾经满过。
taskCount 向线程池提交过的任务数量,该值只增不减。
completedTaskCount 线程池曾经执行完成过的任务总数,该值也是只增不减少,且恒定小于或等于taskCount的值。
allowCoreThreadTimeOut 是否允许核心线程超时,该值默认为false。
 
拒绝策略(待完成)
 
 
 
线程池状态与控制变量
 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
 
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
 

由上述代码可以看到线程池是由一个叫ctl的整型变量来存储线程的状态当前工作线程数,用32个比特位来同时存储线程状态与当前工作线程数,其中高3位用来存储线程状态(为什么是3位而不是2位,因为线程状态的数量有5种,2位不够用,4位以上又太多),低29位用来存储当前工作线程数。

 

调用流程分析(待完成)
 
线程池连接利用的实现原理分析(待完成)
 
线程池的关闭
线程池提供了两种方法来实现线程的关闭:
  1. shutdown:不能再提交任务,已经提交的任务可继续运行;
  2. shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。

 

我们先看看shutdown()方法的源码:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

 

通过代码我们不能看出,这个方法只是将线程的状态设置了SHUTDOWN,即不再接受新任务的提交,然后中断那些空闲的workers,对于正在执行的workers,则未做任务处理。

 

再来看看shutdownNow()方法的源码:

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

 

先将线程池状态设置为STOP,然后再尝试中断所有的workers(包含正在执行的workers),然后调用drainQueue()方法将任务队列中可能存在的待执行任务清除掉。

 
线程池的监控与扩展
线程池提供了下面一些接口来给我们获取线程池的一些相关数据:
 
int getCorePoolSize()
获取核心线程池的大小
 
int getMaximumPoolSize()
获取线程池的最大线程数
 
boolean allowsCoreThreadTimeOut()
获取核心线程是否会超时的设置参数
 
long getKeepAliveTime()
获取空闲线程等待时长的设置
 
getPoolSize()
获取线程池执行过的线程数目,在线程未销毁时该值只增不减
 
getActiveCount()
获取当前正在执行的线程大概数目
 
int getLargestPoolSize()
获取线程池曾经达到过的最大大小
 
long getTaskCount()
获取线程池还需要执行的任务数目
 
long getCompletedTaskCount()
获取线程池曾经执行完成过的任务数目
 
int getQueue()
获取任务队列的长度
 
同时如果我自己对线程池进行扩展,线程也还提供下面这些方法可以我们自己去实现:
 
void onShutdown()

调用线程池的shutdown()方法时会被调用,比如说我们想在线程结束时搞点飞机也可以

 
void beforeExecute()
每个线程开始执行前会调用该方法
 
void afterExecute()
每个线程执行结束后会调用该方法,这个方法和beforeExecute()方法一般用于我们要对线程干的事情进行统计,比如说干了多少什么,干了什么活,干活用了多少时间====。
 
void terminated()
线程池终止时会调用该方法,线程池被终止时我们也可以干点啥。。。
 

猜你喜欢

转载自www.cnblogs.com/chenchenxiaobao/p/10113661.html