文章目录
一 摘要
摘自描述线程池比较好的一段话:
- 线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
- 线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
二 线程池的好处
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
三 线程池解决的问题
1、频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
2、对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
3、系统无法合理管理内部的资源分布,会降低系统的稳定性。
四 ThreadPoolExecutor总体设计
Java中的线程池核心实现类是ThreadPoolExecutor,继承关系如下图:
今天只聊上图中标记的部分,顶层Executor是一个执行接口,其子接口ExecutorService是一个提交接口,从名字上来看就知道Executor接口中定义的是execute方法,而ExecutorService接口中定义的是submit方法,再往下,用一个抽象类AbstractExecutorService实现了ExecutorService接口,其作用就是把上述的执行和提交接口进行合并;最后是ThreadPoolExecutor类,其继承于AbstractExecutorService抽象类。
五 ThreadPoolExecutor核心参数
5.1 corePoolSize
- 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,(除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程)。
- 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
5.2 maximumPoolSize
- 线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则调用拒绝策略来处理任务。
5.3 keepAliveTime
- 当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。当线程池中线程数超过corePoolSize,并且没有配置allowCoreThreadTimeOut=true,空闲时间超过keepAliveTime的线程会退出,保持线程池中线程数为corePoolSize。当设置allowCoreThreadTimeOut=true时,任何空闲时间超过keepAliveTime的线程都会被销毁。
5.4 unit
- 表示keepAliveTime的单位。
5.5 workQueue
- 一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
- ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
5.6 threadFactory
- 执行程序创建新线程时使用的工厂。
5.7 handler
- 表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常;
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常;
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程);
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
六 ThreadPoolExecutor参数设置
6.1 corePoolSize
- 每个任务需要tasktime秒处理,则每个线程每秒可处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即taskstasktime个线程数。假设系统每秒任务数为[100,1000],每个任务耗时0.1秒,则需要1000.1至1000*0.1,即[10,100]个线程。那么corePoolSize应该设置为大于10,具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%的情况下第秒任务数小于200,最多时为1000,则corePoolSize可设置为20。
6.2 queueCapacity
- 任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。队列长度可以设置为(corePoolSize/tasktime)*responsetime: (20/0.1)*2=400,即队列长度可设置为400。
队列长度设置过大,会导致任务响应时间过长,切忌以下写法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
- 这实际上是将队列长度设置为Integer.MAX_VALUE,将会导致线程数量永远为corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增。
6.3 maxPoolSize
- 当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。每秒200个任务需要20个线程,那么当每秒达到1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60。
6.4 keepAliveTime
- 线程数量只增加不减少也不行。当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTime,该线程就退出。默认情况下线程池最少会保持corePoolSize个线程。
6.5 allowCoreThreadTimeout
- 默认情况下核心线程不会退出,可通过将该参数设置为true,让核心线程也退出。
总结: 以上关于线程数量的计算并没有考虑CPU的情况。若结合CPU的情况,比如,当线程数量达到50时,CPU达到100%,则将maxPoolSize设置为60也不合适,此时若系统负载长时间维持在每秒1000个任务,则超出线程池处理能力,应设法降低每个任务的处理时间(tasktime)。
七 ThreadPoolExecutor源码
7.1 ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看到前三个方法最终都调用了最后一个、参数列表最长的那个方法,在这个方法中给七个属性赋值。创建线程池对象,强烈建议通过使用ThreadPoolExecutor的构造方法创建,不要使用Executors,至于建议的理由上文中也有说过,这里再引用阿里《Java开发手册》中的一段描述。
【强制】线程池不允许使用Executors创建,建议通过ThreadPoolExecutor的方式创建,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
- FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integet.MAX_VALUE,可能会堆积大量的请求从而导致OOM; - CachedThreadPool:
允许创建线程数量为Integet.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
了解了线程池ThreadPoolExecutor的基本构造,接下来编写一段代码看看如何使用,样例代码中的参数仅为了配合原理解说使用。
package concurrency.threadPool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class BuildThreadPoolExecutor {
public static void main(String[] args) {
BuildThreadPoolExecutor buildThreadPoolExecutor = new BuildThreadPoolExecutor();
buildThreadPoolExecutor.test();
}
public void test() {
ThreadFactory threadFactory = new MyThreadFactory("交易核心线程");
// 设置线程池的核心线程数为3,最大线程数为6,线程空闲时间为30,线程空闲时间单位为秒
// 阻塞队伍为11的LinkedBlockingQueue
// 自定义线程工厂和丢弃任务并抛出RejectedExecutionException拒绝策略
Executor executor = new ThreadPoolExecutor(3, 6, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(11), threadFactory, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i <= 20; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
}
class MyThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
MyThreadFactory(String featureOfGroup) {
this.namePrefix = featureOfGroup + ",线程编号:";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, r, name, 0);
return thread;
}
}
}
7.2 ThreadPoolExecutor工作原理
1、通过execute方法提交任务时,如果当前线程池大小PoolSize小于corePoolSize,则创建新线程执行任务;
2、如果当前线程池大小PoolSize达到corePoolSize,且等待队列未满,则进入等待队列;
3、如果当前线程池大小PoolSize大于corePoolSize且小于maximumPoolSize,且等待队列已满,则新提交的任务将通过创建新线程执行;
4、如果当前线程池大小PoolSize大于corePoolSize且已经达到maximumPoolSize,且等待队列已满,则调用拒绝策略来处理该任务;
**总结:**线程池里的每个线程执行完任务后不会立即退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在keepAliveTime里等不到新的任务了,那么线程会退出。
7.3 ThreadPoolExecutor全局常量和方法
public class ThreadPoolExecutor extends AbstractExecutorService {
// 初始化线程池状态为RUNNING、线程池数量为0。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池允许的最大数量(2^29)-1 ---------> 000 11111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//在ThreadPoolExecutor中使用32位二进制数来表示线程池的状态和线程池中线程数量,其中前3位表示线程池状态,后29位表示线程池中线程数
// 111 00000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// c & (111 00000 0000 0000 0000 0000 0000 0000) ---> 线程池的状态
private static int runStateOf(int c) {
return c & ~CAPACITY; }
// c & (000 11111 1111 1111 1111 1111 1111 1111) ---> 线程池中线程数
private static int workerCountOf(int c) {
return c & CAPACITY; }
// return state | capacity ----> 计算ctlOf新值,也就是线程池状态和线程池个数
private static int ctlOf(int rs, int wc) {
return rs | wc; }
}
线程池处在不同的状态时,它的处理能力是不同的。
线程池不同状态之间的转换时机及转换关系如下图。
7.4 ThreadPoolExecutor的execute方法
通过之前的内容描述,我们使用ThreadPoolExecutor的execute方法提交任务,所以从execute的源码入手。
public void execute(Runnable command) {
// 如果任务为null,则抛出空指针异常
if (command == null)
throw new NullPointerException();
// 获取线程池状态和线程池数量的组合值
int c = ctl.get();
// 如果当前线程池中线程数小于核心线程数,则创建新线程执行任务
if (workerCountOf(c) < corePoolSize) {
// 创建新线程执行任务,创建成功则返回,否则再次获取线程池状态和线程池数量的组合值
if (addWorker(command, true))
return;
// 再次获取线程池状态和线程池数量的组合值
c = ctl.get();
}
// 如果线程池是运行状态,且可以添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取线程池状态和线程池数量的组合值,因为这个时候线程池的状态可能改变了
int recheck = ctl.get();
// 如果线程池已经不是运行状态了,且可以从阻塞队列中删除任务,则执行拒绝策略
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 如果线程池中线程数等于0,则新创建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果阻塞队列满了,新创建线程也失败了,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
execute方法执行流程图:
7.5 ThreadPoolExecutor的addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 自旋 while(true)
int c = ctl.get();
int rs = runStateOf(c);
// 如果当前线程池的状态不允许创建线程,则返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 通过自循环尝试获取创建新线程的机会
for (;;) {
int wc = workerCountOf(c);
// 如果线程数大于线程池最大容量,或者大于核心线程数,或者大于最大线程数,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加线程数失败了,回到起点重来
if (compareAndIncrementWorkerCount(c)) // CAS操作
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生改变了,回到起点再来
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 线程池中线程和任务被封装到Worker中,这些Worker存在HashSet集合中
w = new Worker(firstTask);
// 取出封装的线程
final Thread t = w.thread;
if (t != null) {
// 使用ReentrantLock控制并发
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果线程池状态处于STOP/TIDYING/TERMINATED,或者线程池状态SHUTDOWN且当前任
// 务是null,则抛出异常,否则将封装好的Worker添加到HashSet中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
// 记录线程池历史最大值,largestPoolSize <=?maximumPoolSize.
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功,则执行线程的start方法,启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法执行流程图:
源码中将线程和任务封装到了Worker中,然后将Worker添加到HashSet集合中,添加成功后通过线程对象的start方法启动线程执行任务,既然这样那我们就来看看上图代码中的w = new Worker(firstTask)到底是如何执行的。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 调用任务处
runWorker(this);
}
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口,看到这里很清楚了任务最终由Worker中的run方法执行,而run方法里调用了runWorker方法,所以重点还是runWorker方法。
7.6 ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 从阻塞队列中不断获取任务,如果任务不为空则执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 获取不到任务时,Worker从HashSet中清除操作,主动回收自己
processWorkerExit(w, completedAbruptly);
}
}
runWorker执行流程图:
线程回收的工作是在processWorkerExit方法完成的。
在runWorker方法中,使用循环,通过getTask方法,不断从阻塞队列中获取任务执行,如果任务不为空则执行任务,这里实现了线程的复用,不断的获取任务执行,不用重新创建线程;队列中获取的任务为null,则将Worker从HashSet集合中清除,注意这个清除就是空闲线程的回收。那getTask何时返回null?接着看getTask源码。
7.7 ThreadPoolExecutor的getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 当线程池是RUNNING,并且阻塞队列是空的,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取线程池中的线程数
int wc = workerCountOf(c);
// 如果配置了allowCoreThreadTimeOut = true,或者线程池中线程数大于核心线程数,timed为true,
// 这个timed是指在规定的时间内等待去获取阻塞队列中任务,这个规定的时间就是指定线程存活时间keepAliveTime
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,则在规定的时间内等待获取阻塞队列中的任务
// 否则,获取任务时阻塞等待任务的到来
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果获取的任务不是null,则返回
// 否则继续循环获取,注意此时timedOut=true,继续下一次循环会返回null
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask方法执行流程图:
到这里,线程池中线程是如何执行任务、如何复用线程,以及线程空闲时间超限如何判断都已经清楚了。
八 Demo(addWorker->runWorker)
从上面的源码可以发现addWork中,获取Worker中thread,thread是调用start方法,可是为什么会触发runWorker方法呢?这里我写了个Demo进行了一下,突然就明白了其中的流程了。
package concurrency.threadPool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 线程池addWork方法中调用start到runWork方法
*/
public class TestRunWork
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
private volatile ThreadFactory threadFactory;
public TestRunWork(Runnable firstTask, ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
setState(-1);
this.firstTask = firstTask;
this.thread = this.getThreadFactory().newThread(this);
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
@Override
public void run() {
runWork();
}
private void runWork() {
firstTask.run();
System.out.println("runWork方法执行成功");
}
public static void main(String[] args) {
Runnable task = ()->{
System.out.println("我是一个Runnable任务");
};
TestRunWork testRunWork = new TestRunWork(task, new MyThreadFactory());
Thread thread1 = testRunWork.thread;
thread1.start();
}
static class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(null, r, "xxxx", 0);
return thread;
}
}
}
打印结果:
我是一个Runnable任务
runWork方法执行成功
原因在于Worker中构造方法调用ThreadFactory的newThread方法传入参数是自身,最后start的时候运行的是自身的run方法。
总结:线程池工作原理和底层实现原理是面试必问的考题,所以,这块是一定要掌握的,遇到看不懂的源码一定要多去模拟。