目录
一、什么是线程池
二、使用方法
三、原理分析
一、什么是线程池?
线程池就是是一种多线程处理形式,里边可以有很多线程的池子,通过线程池可以对多条线程进行维护管理,进行有效合理的调度和复用等,节省系统和cpu资源,提升性能。
二、使用
一个非常非常简单的例子,容易理解,为后面做铺垫。
public class MyClass {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++){
es.submit(new MyRunnable());
}
es.shutdown();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
while (true){
System.out.println("thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
thread id is: 13
thread id is: 12
thread id is: 14
thread id is: 15
thread id is: 16
thread id is: 16
thread id is: 15
thread id is: 13
thread id is: 12
thread id is: 14
thread id is: 16
...
newFixedThreadPool:该方法返回一个固定线程数量的线程池。
我们虽然创建了10条线程丢到线程池里,但我们发现线程id只是从12-16共五条线程,这是因为我们创建的FixedThreadPool设置了最大同时运行线程数目5,而且当这5个运行完才会运行其他线程
我们来改一下代码:
public class MyClass {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++){
es.submit(new MyRunnable());
}
es.shutdown();
}
}
class MyRunnable implements Runnable{
public boolean isStop = false;
int i = 0;
@Override
public void run() {
while (!isStop){
if (i++ == 3){
isStop = true;
}else {
System.out.println("thread name is: "+ Thread.currentThread().getName()+"---thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-2---thread id is: 13
一条线程运行3次,但我们发现如果有线程完成了,下一条线程进来时线程id和名字都是一样的,这是因为:
直接看总结去吧哈哈
三、原理分析
ExecutorService es = Executors.newFixedThreadPool(5);
先看看这个句子涉及到了那几个类:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2);
Future<?> submit(Runnable var1);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable var1);
}
public class Executors {
ExecutorService就是Executor 的加强版,看上一篇可以了解Future、Runnable、Callable和有无返回值,和各种方法的区别,不多说。
ExecutorService里还定义了以下常用的方法如
shutdown() invokeAny()
其中shutdown()和shutdownNow()的区别在于前者会等待线程的执行完毕在shutdown。
然后就是Executors,其是一个工厂类,里边提供很多工厂方法来创建不同类型的线程池。
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newWorkStealingPool(int var0) {
return new ForkJoinPool(var0, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory var0) {
return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, var0));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
return new ScheduledThreadPoolExecutor(var0, var1);
}
常用线程池:
//创建单核心的线程池
newSingleThreadExecutor();
//创建固定核心数的线程池
newFixedThreadPool(5);
//创建一个按照计划规定执行的线程池
newScheduledThreadPool(2);
//创建一个自动增长的线程池
newCachedThreadPool();
//创建一个具有抢占式操作的线程池
newWorkStealingPool();
...
都可见名之意
那为什么创建完成的线程池都能被ExecutorService接受呢
看个类和接口的关系图就懂啦:
现在我们拿出个创建线程池的方法来分析一下吧
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
执行了ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if (var6 != null && var7 != null && var8 != null) {
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
可以发现用传进来的参数进行对线程池的实例化
这些参数都非常重要:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- workQueue:等待队列
- keepAliveTime:多于线程的生命
- threadFactory :线程工厂,用于创建线程
- handler:拒绝策略,即当线程池和队列都满时,对继续进来的任务采取的措施。
我们现在已经构造好一个线程池了,接下来就是我们的submit()提交任务了:
for (int i = 0; i < 10; i++){
es.submit(new MyRunnable());
}
看看究竟做了什么事:
跟进去来到了
public Future<?> submit(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
this.execute(var2);
return var2;
}
}
由于我们传进来的时Runnable所以它帮我们封装成了RunnableFuture,RunnableFuture继承了Future,所以这里调用了用ThreadPoolExecutor的execute()方法后就返回,并用Future接收
接下来再看ThreadPoolExecutor的execute()方法:
public void execute(Runnable var1) {
if (var1 == null) {//如何任务为空,抛异常
throw new NullPointerException();
} else {
int var2 = this.ctl.get();//获取线程池状态
if (workerCountOf(var2) < this.corePoolSize) {//如果当前工作的线程小于核心线程,尝试调用addWorker()开启新线程运行任务
if (this.addWorker(var1, true)) {
return;
}
var2 = this.ctl.get();//获取新的状态
}
if (isRunning(var2) && this.workQueue.offer(var1)) {//如果正在运行,则添加任务到workQueue阻塞队列
int var3 = this.ctl.get();//获取新状态二次校验
if (!isRunning(var3) && this.remove(var1)) {//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
this.reject(var1);
} else if (workerCountOf(var3) == 0) {//线程池为空,添加空线程
this.addWorker((Runnable)null, false);
}
} else if (!this.addWorker(var1, false)) {//满了则新增空线程,新增失败则执行拒绝策略
this.reject(var1);
}
}
}
这里很明显重要方法是addWorkder ():
private boolean addWorker(Runnable var1, boolean var2) {
while(true) {
int var3 = this.ctl.get();
int var4 = runStateOf(var3);
if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {
return false;
}
while(true) {
int var5 = workerCountOf(var3);
if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {
return false;
}
if (this.compareAndIncrementWorkerCount(var3)) {
boolean var18 = false;
boolean var19 = false;
ThreadPoolExecutor.Worker var20 = null;
try {
var20 = new ThreadPoolExecutor.Worker(var1);
Thread var6 = var20.thread;
if (var6 != null) {
ReentrantLock var7 = this.mainLock;
var7.lock();
try {
int var8 = runStateOf(this.ctl.get());
if (var8 < 0 || var8 == 0 && var1 == null) {
if (var6.isAlive()) {
throw new IllegalThreadStateException();
}
this.workers.add(var20);
int var9 = this.workers.size();
if (var9 > this.largestPoolSize) {
this.largestPoolSize = var9;
}
var19 = true;
}
} finally {
var7.unlock();
}
if (var19) {
var6.start();
var18 = true;
}
}
} finally {
if (!var18) {
this.addWorkerFailed(var20);
}
}
return var18;
}
var3 = this.ctl.get();
if (runStateOf(var3) != var4) {
break;
}
}
}
}
这里各种名字都是var n 我贴大佬的吧:
https://www.cnblogs.com/huangjuncong/p/10031525.html
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//(6) 检查队列是否只在必要时为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//(7)循环cas增加线程个数
for (;;) {
int wc = workerCountOf(c);
//(7.1)如果线程个数超限则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//(7.2)cas增加线程个数,同时只有一个线程成功
if (compareAndIncrementWorkerCount(c))
break retry;
//(7.3)cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//(8)到这里说明cas成功了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//(8.1)创建worker
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//(8.2)加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
mainLock.lock();
try {
//(8.3)重新检查线程池状态,为了避免在获取锁前调用了shutdown接口
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//(8.4)添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//(8.5)添加成功则启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
都差不多,
进行一系列逻辑后最重要的是执行了
w = new Worker(firstTask)和t.start() 这了两个方法
第一个:Worker是一个内部类,实现了Runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable var2) { //构造方法拿到我们传进来的任务
this.setState(-1);
this.firstTask = var2;
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
...
}
构造方法先把我们的任务变成自己的firstTask,之后通过ThreadFactory创建了线程,赋值给了thread,然后回去,t拿到了就t.start(),执行什么呢?其实就是run()方法
我们知道一个任务重要的在run()方法,这个实现了Runnable的Worke,也是个可执行任务哦,所以我们从它的run()方法看
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
其调用了runWorker()
final void runWorker(ThreadPoolExecutor.Worker var1) {
Thread var2 = Thread.currentThread();
Runnable var3 = var1.firstTask;
var1.firstTask = null;
var1.unlock();
boolean var4 = true;
try {
while(var3 != null || (var3 = this.getTask()) != null) {//取等待队列的任务或者添加自己携带的任务 , 当task == null就会阻塞。
var1.lock();
if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) {
var2.interrupt();
}
try {
this.beforeExecute(var2, var3);
Object var5 = null;
try {
var3.run();//执行我们自己写的run方法
} catch (RuntimeException var28) {
var5 = var28;
throw var28;
} catch (Error var29) {
var5 = var29;
throw var29;
} catch (Throwable var30) {
var5 = var30;
throw new Error(var30);
} finally {
this.afterExecute(var3, (Throwable)var5);
}
} finally {
var3 = null;//执行完后设为空,
++var1.completedTasks;
var1.unlock();
}
}
var4 = false;
} finally {
this.processWorkerExit(var1, var4);
}
}
当我们的task不为空的时后进行循环,为空的或就用getTask()去workQueue获取,如果又为空,那就阻塞了。所以说啊,复用就在这体现了。每个线程执行完任务后就会去取任务,而不是销毁重创
getTask():
private Runnable getTask() {
boolean var1 = false;
while(true) {
int var2 = this.ctl.get();
int var3 = runStateOf(var2);
if (var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) {
this.decrementWorkerCount();
return null;
}
int var4 = workerCountOf(var2);
boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize;
if (var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) {
try {
Runnable var6 = var5 ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
if (var6 != null) {
return var6;
}
var1 = true;
} catch (InterruptedException var7) {
var1 = false;
}
} else if (this.compareAndDecrementWorkerCount(var2)) {
return null;
}
}
}
总结
简单的例子:
比如说我建了个线程池为5个核心大小的固定线程池
线程池一开始丢进来5个任务时,会创建线程并执行这个任务,这时候已经达到核心线程数,如果再往里丢任务,通常会直接进入workQueue等待队列
当这5个线程中的某个执行完后会不停往workQueue取任务来执行,并没有创建新线程!取不到这个线程就阻塞,while循环一直getTask()我们新丢进去任务。getTask()还会判断等待时间是否超时来回收这个线程可以看下runWorker()processWorkerExit()就知道了。