文章目录
在计算机系统中,
进程
是程序在一个数据集合上的运行过程,是系统进行资源分配和调度的基本单位,每个进程都拥有自己私有的地址空间,且进程之间是相互独立的。线程
是操作系统调度的最小单位,它可以说是进程的组成部分,在一个进程中可以创建多个线程,这些线程都拥有各自计数器、栈和局部变量等属性,但是不再拥有系统资源,它与进程中的其他线程共享该进程所拥有的全部资源,或称共享内存变量(堆内存)。线程是独立运行的,且采用抢占式的方式执行,它的生命周期包含5个阶段,即新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和销毁(Destoryed)。
1. 多线程基础
所谓多线程,是指从软件或者硬件上实现多个线程并发
执行的技术,它的目的为在一个进程中使用多个线程同步完成多项子任务,以达到通过提高资源的使用效率来提高系统的效率。换句话来说,就是使用多线程可以减少程序的响应时间
,假如使用单线程恰好执行了一个耗时的任务,那么程序在这个任务结束前就无法再接收其他任务或者响应事件。相比于多进程,多线程有如下优点:
(1)线程创建和切换开销小,因为不用为每个线程分配系统资源;
(2)进程间不能共享内存,但线程间共享整个虚拟地址空间;
(3)多线程实现多任务并发效率高,因为线程创建和切换开销小;
(4)提高了CPU等资源的利用率,同时使程序便于理解和维护。
1.1 线程安全
虽说多线程并发能够大大提高程序的运行效率和资源的利用率,但是由于系统的线程调度具有一定的随机性
,当使用多个线程来访问同一个数据时,非常容易出现线程安全问题。换句话来说,就是多个线程访问同一个数据时可能会出现错误,这时候我们就需要某些同步机制
来确保每个线程访问这个数据都能够得到正确的结果,而常用的同步机制有Syschronize同步
、锁同步
以及volatile
同步。
1.1.1 Syschronize同步机制
Syschronize同步机制指的是使用Syschronize关键字
来实现多线程同步,分为同步代码块
、同步方法
,它的工作原理是通过引入同步监视器对象(或称对象锁)
来实现阻止多个线程对同一共享资源
进行并发访问,即当某个线程获得了这个对象锁时,才允许对这个共享资源进行访问,此时其他线程因为没有获得锁而无法操作共享资源,只有当获得锁的线程释放了锁,其他线程获得锁才能够访问共享资源。
(1)同步代码块
同步代码块是指程序中某段代码使用syschronize
关键字包裹的一段代码,且需要向这个关键字传入一个锁对象,该对象可以是任意类型对象。同步代码块的语法格式如下:
syschronize(obj) {
...
// 同步代码块
}
示例如下:
/** 单例模式
* author : jiangdg
* date : 2020/2/8 20:40
* desc : 双重检查模式
* version: 1.0
*/
public class SingleInstance {
// volatile保证instance的可见性和有序性
private static volatile SingleInstance instance;
private SingleInstance(){}
public static SingleInstance getInstance() {
if(instance == null) {
// 同步代码块
// 防止多线程访问出错,锁对象为SingleInstance的class对象
synchronized (SingleInstance.class) {
if(instance == null) {
instance = new SingleInstance();
}
}
}
return instance;
}
}
(2)同步方法
与同步代码块类似,同步方法仍然使用syschronize
关键字实现,只是该关键字修饰的是一个方法,对于同步方法而言,无须显示指定同步监视器(锁对象),因为这个锁对象为this,即其对象的本身。语法格式:
public synchronized void method() {
...
}
注:只要方法被synchronized关键字修饰即可,其他不作具体要求。
示例代码:
// HashTable的containsKey方法源码
public synchronized boolean containsKey(Object key) {
HashtableEntry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (HashtableEntry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return true;
}
}
return false;
}
1.1.2 锁同步机制
锁同步机制是指对某一共享资源进行上锁,只允许获得锁的线程能够访问资源,当访问完毕后再释放锁,以便其他线程获得锁实现对资源的访问。实际上,上述的Syschronize同步也是一种锁同步机制,只不过是synchronized关键字自动提供了锁以及相关的条件
,即隐式锁。现在我们将介绍一种显示锁–ReentrantLock
,重入锁ReentrantLock引入于Java SE5.0,它表示该锁能够支持一个线程对资源的重复加锁
。语法格式如下:
Lock mLock = new ReentrantLock();
// 上锁
mLock.lock();
try {
...
} finally {
// 释放锁
mLock.unlock();
}
示例代码:
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
// ArrayBlockQueue的put方法源码
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
// 获得锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
注:关于条件对象
notEmpty
和notFull
及其相关方法的作用将在1.3小节详讲。
1.1.3 volatile关键字
有时仅仅为了读写一个或两个实例域就使用同步的话,显得开销过大,而volatile关键字为实例域的同步访问提供了免锁机制
,即如果声明一个被volatile修饰的实例,那么编译器和虚拟机就知道该实例时可能被另一个线程并发更新,因为volatile可使实例具体可见性和有序性的特性,但是由于volatile无法保证原子性,因此使用volatile需要具备以下两个条件:
(1)对变量的写操作不会依赖于当前值;
(2)该变量没有包含在具有其他变量的不变式中。
使用volatile场景有很多种,除了1.1.1(1)小节中提到的双重检查单例模式
外,还有一种情况也经常用到volatile关键字来保证被修饰变量的可见性和有序性,即状态标志
。示例代码如下:
/** 录音线程
* author : jiangdg
* date : 2020/2/8 22:14
* desc :
* version: 1.0
*/
public class RecordAudioThread extends Thread {
private volatile boolean isExit = false;
@Override
public void run() {
while (! isExit) {
...
}
}
public void stopThread() {
isExit = true;
}
}
原子性、可见性、有序性是并发编程的3个特性。
- 原子性:对基本数据类型变量的读取和赋值操作是原子性操作,即
这些操作是不可被中断的,要么执行完毕,要么就不执行
。比如x=3是原子操作,而y=x和x++则不是,因为对于y=x来说,它包含了两个操作,即先读取x的值,再将x的值写入工作内存。- 可见性:可见性是指线程之间可见,即一个线程修改的状态对另外一个线程是立即可见的。当一个共享变量被volatile修饰时,它会保证修改的值立即被更新到主存,所以对其他线程是可见的。
- 有序性:有序性是指不允许编译器和处理器对指令进行重排序,虽然重排序过程不会影响到单线程执行的正确性,但是会影响到多线程并发执行的正确性,这时可以通过volatile来保证有序性,当然,也可以通过sysnchronized和Lock来保证有序性,因此同步机制只允许每个时刻只有一个线程执行同步代码。
1.2 同步与异步
所谓同步和异步,是指当某个操作(请求)发出后,是否需要等待结果才能继续执行其他操作
,即对于同步(sync)来说,当执行某个操作后,必须要等待操作返回的结果才能继续执行其他操作;而对于异步(async)来说,当执行某个操作后,不需要等待其结果返回照样能继续执行其他操作,且当有结果时会通过回调等方式通知调用者。对于同步和异步操作,我们接触得较多的就是使用OkHttp发起同步和异步请求,它们的区别如下:
- 同步请求
private void syncHttpGet() {
OkHttpClient okHttpClient=new OkHttpClient();
final Request request=new Request.Builder()
.url("https://blog.csdn.net/andrexpert")
.get()
.build();
final Call call = okHttpClient.newCall(request);
// 开启一个工作线程
// 发起同步请求,之后工作线程需要等待服务器响应结果才执行下一步操作
// 即执行 Log.e("同步结果---- ",response.body().string()+"");
new Thread(new Runnable() {
@Override
public void run() {
try {
Response response = call.execute();
Log.e("同步结果---- ",response.body().string()+"");
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
- 异步请求
private void asyncHttpGet() {
OkHttpClient okHttpClient=new OkHttpClient();
final Request request=new Request.Builder()
.url("https://blog.csdn.net/andrexpert")
.get()
.build();
final Call call = okHttpClient.newCall(request);
// 发起异步请求
Log.d("okhttp","-----开始发起异步请求");
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d("okhttp_error",e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Gson gson=new Gson();
Log.d("okhttp_success",response.body().string());
}
});
// 当执行完call.enqueue后,无需等待调用结果
// 会接着执行以下代码
Log.d("okhttp","-----完成发起异步请求,等待结果");
}
使用enqueue方法发起异步请求无需再开启子线程,因为该方法会自动将网络请求部分放入子线程中执行,当服务器返回响应结果时会通过接口回调的方式将结果通知请求方。因此,enqueue回调方法onResponse与onFailure都执行在子线程中,因此在这两个方法中不能进行UI更新操作。
1.3 阻塞与非阻塞
如果说同步/异步关注的是消息通知的机制
,那么阻塞/非阻塞关注的是线程等待消息通知时的状态
。所谓阻塞,是指调用结果返回之前,调用线程会被挂起并进入阻塞状态(放弃CPU的使用权,暂时停止运行
),只有在得到结果之后才会重新进入就绪状态(可运行状态,等待系统调度
);所谓非阻塞,是指在不能立刻得到结果之前,调用线程不会被挂起,即不会进入阻塞状态。通常,程序一般是在非阻塞状态下运行,只有在处理线程安全请求锁失败时才会使当前线程进入阻塞状态。因此,接下来,我们通过分析阻塞队列ArrayBlockingQueue
的put和take方法源码来分析阻塞状态的具体实现。相关源码如下:
- ArrayBlockingQueue#put方法
final Object[] items; // 内部数组,用于存储数据
final ReentrantLock lock; // 重入锁
private final Condition notEmpty; // 条件对象,即满足不为空条件
private final Condition notFull; // 条件对象,即满足不为满条件
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
// 1. 获得锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 2. 当数组满时,无法插入
// 调用notFull的await方法将调用线程置于阻塞状态,放弃锁
while (count == items.length)
notFull.await();
// 数组未满,执行插入操作
// 唤醒读取线程
enqueue(e);
} finally {
// 2. 释放锁
lock.unlock();
}
}
从put()方法源码可知,该方法使用了重入锁ReentrantLock
来确保多线程并发访问安全,其具体操作流程为:首先,调用线程获得重入锁ReentrantLock对象并上锁;然后,判断阻塞队列内部数组是否已满,如果已满,则说明无法再继续插入数据,这里就会调用条件对象notFull的await()方法
阻塞调用线程,此时调用线程就会放弃获得的锁对象。如果未满,则执行插入数据操作,当插入成功后就会调用条件对象notEmpty的signal()方法
以唤醒调用了notEmpty的await方法的线程并执行读取操作;最后,释放锁。enqueue方法源码如下:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
// notEmpty.signal唤醒调用了notEmpty的await方法的线程
// 即从阻塞状态变为就绪状态
notEmpty.signal();
}
- ArrayBlockingQueue#take方法
public E take() throws InterruptedException {
// 1. 获得锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 2. 判断当前队列是否为空,如果为空则阻塞调用线程
// 同时释放获得的锁
while (count == 0)
notEmpty.await();
// 如果不为空,执行读取操作
// 并唤醒插入线程
return dequeue();
} finally {
// 2. 释放锁
lock.unlock();
}
}
take方法的逻辑与put方法一致,也是经历了上锁、释放锁这么一个过程,唯一不同的就是当获得锁后,接下来的操作是判断当前队列是否为空,如果为空则调用条件对象notEmpty的await()
方法阻塞当前调用线程,同时释放锁。如果不为空,则调用dequeue方法执行读取操作,该方法又会调用条件对象notFull的signal()
方法唤醒之前调用了notFull的await方法的线程,以便完成插入操作。dequeue方法源码如下:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒插入线程
// 使其从阻塞状态转为就绪状态
notFull.signal();
return x;
}
实际上,除了重入锁能够使用条件对象newCodition实现线程通信,synchronize同步机制也可以通过调用Object的wait()、notify()和notifyAll()方法实现同等效果。其中,object的wait方法对应于Condition的await()方法,notify()和notifyAll()方法分别对应于Condition的signal()和signalAll()方法。
以迅雷下载电影为例,理解以下四种情况:
- 同步阻塞:发起下载操作后,什么也不做,一直看着迅雷的下载界面直到下载完毕;
- 同步非阻塞:发起下载操作后,可以浏览网页,但是要时不时看下迅雷是否下载完毕;
- 异步阻塞:发起下载操作后,什么也不做,闭目养神,当听到"叮"的一声说明下载完毕;
- 异步非阻塞:发起下载操作后,可以浏览网页,当听到"叮"一声说明下载完毕。
1.4 Java内存模型
Java虚拟机规范中试图定义一种Java内存模型(Java Memory Model,JMM)来屏蔽掉各种硬件和操作系统的内存访问差异
,以实现让Java程序在各种平台下都能达到一致的内存访问效果。换句话来说,就是Java内存模型定义了线程和主存之间的抽象关系:线程之间的共享变量存储在主存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程共享变量的副本。Java内存模型控制线程之间的通信,它决定了一个线程对主存共享变量的写入何时对另一个线程可见需要注意的是,本地内存是Java内存模型的一个抽象概念,其并不真实存在,它涵盖了缓存、写缓冲区、寄存器等区域,而共享变量区域通常指堆内存存储的对象实例,即存在内存可见性
问题。Java内存模型的抽象示意图如下:
假设线程A与线程B之间通信,必须要经历以下两个步骤:
(1)线程A把线程A本地内存中更新过的共享变量刷新到主存中去;
(2)线程B到主存中去读取线程A之间已更新过的共享变量。
volatile关键字能够保证被修饰共享变量的可见性。
2. 线程池原理
在开发中,无论是请求网络还是IO操作,往往都需要开启一个子线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开一个新线程去执行,则这些线程的创建和销毁将消耗大量的资源,并且线程都是相互独立的,很难对其进行控制,这时候就需要线程池来进行管理。
2.1 线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。换句话说,线程池就是一组已经创建好的,一直在等待任务执行的线程的集合
。线程池维护者多个线程,等待着系统分配可并发执行任务,因此可以避免在处理短时间任务时频繁创建于销毁线程带来性能消耗。
2.1.1 ThreadPoolExecutor
在Java 1.5中提供了Executor框架来实现线程池,该框架用于把任务的提交和执行解构,其中,任务的提交
交给Runnable或者Callable,而Executor框架用来处理任务
。Excutor框架中最核心的成员就是ThreadPoolExcutor
,它是线程池的核心实现类,通常我们就是通过这个类来创建一个线程池。它的构造方法有很多参数,下面就来看下这些参数的意义。构造方法原型如下:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数意义如下:
corePoolSize
:核心线程数量。默认情况下线程池是空的,只有任务提交时才会创建核心线程,并且默认核心线程是一直存在于内存中,以等待执行提交的任务;maximumPoolSize
:线程池允许创建的最大线程数,即核心线程+非核心线程。如果核心线程使用完了且任务队列已满,但是线程数小于maximumPoolSize,线程池仍旧会创建新的线程来处理任务,这类线程有超时限制,闲置超时后会被自动回收,因此称为非核心线程;keepAliveTime
:非核心线程闲置的超时时间。当非核心线程闲置时间超过这个值,被会系统回收;unit
:keepAliveTime参数的时间单位,比如天(DAYS)、小时(HOURS)、分钟(MINITES)、秒(SECONDS)、毫秒(MILLSECONDS)等;workQueue
:任务队列。如果当前线程数大于corePoolSize,则将任务添加到任务队列中;threadFactory
:线程工厂。一般情况下无需设置该参数,默认即可。rejectedExecutionHandler
:饱和策略。当前任务队列和线程池都满了时所采取的策略,默认是AbortPolicy,表示无法处理新任务并抛出RejectedExecutionHandler异常。此外,还有3中策略,它们是CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
(1) 工作原理
线程池工作原理流程图如下:
具体步骤为:
(a) 提交任务后,线程池先判断当前线程数是否达到核心线程数(corePoolSize)
。如果未达到核心线程数,则创建核心线程处理任务;否则,就执行(b)操作;
(b)接着线程池判断任务队列(workQueue)
是否已满。如果没满,则将任务添加到任务队列中;如果已满,线程池就会判断当前线程数是否达到了最大线程数(maximumPoolSize)
。如果未达到,则创建非核心线程处理任务;否则,就执行饱和策略(rejectedExecutionHandler)
,默认是抛出rejectedExecutionHandler异常。
(2) 线程池种类
了解了ThreadPoolExecutor类后,我们就可以通过配置其构造方法实现各种符合项目需求的线程池,当然,Java的Executor框架为了便于线程池开发,也为我们封装了多种常用的线程池,这些线程池定义在Executors
类中,接下来,我们就简单分析下其中常用的线程池类型。
FixedThreadPool
:可重用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。这些核心线程被创建后不会被回收,当线程数超过corePoolSize时,提交的任务就会被添加到无界阻塞队列(LinkedBlockingQueue
)中。当线程池有空闲线程时,则从任务队列中取任务执行。
SingleThreadExecutor
:仅有一个工作线程的线程池。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor只有一个核心线程,最大线程数也只为1。当线程数超过corePoolSize时,提交的任务就会被添加到无界阻塞队列(LinkedBlockingQueue
)中。SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。
CachedThreadPool
:支持根据需要创建线程的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool没有核心线程,但是非核心线程是无界的(Integer.MAX_VALUE
),且每个非核心线程的闲置超时时间为60s。CachedThreadPool使用了一个不存储元素的阻塞队列(SynchronousQueue
),每个插入操作必须等待另一个线程的移除操作,同样每次移除操作必须等待另一个线程的插入操作。CachedThreadPool比较适合大量的需要立即处理并且耗时较少的任务。
ScheduledThreadPool
:能实现定时和周期性任务的线程池
// \Android8.0\libcore\ojluni\src\main\java\java\util\concurrent
// \Executors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// \Android8.0\libcore\ojluni\src\main\java\java\util\concurrent
// \ScheduledThreadPoolExecutor.java
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue()); // 无界阻塞队列
}
ScheduledThreadPoolExecutor主要用于给定延时之后的运行任务或者定期处理任务,该类继承于ThreadPoolExecutor,因此它的构造方法最终调用的是ThreadPoolExecutor的构造方法。
2.1.2 源码解析
在2.1.1小节中,我们了解到创建一个线程池只需要实例化一个ThreadPoolExecutor对象即可,至于这个线程池应该具有什么特性,由ThreadPoolExecutor构造方法中传入的参数决定。该构造方法源码如下:
private final BlockingQueue<Runnable> workQueue;
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// 重载构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 边界处理
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 核心线程数
this.corePoolSize = corePoolSize;
// 最大线程数
this.maximumPoolSize = maximumPoolSize;
// 任务队列,即BlockingQueue类型的阻塞队列
this.workQueue = workQueue;
// 非核心线程闲置时长
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 默认为Executors.defaultThreadFactory()
this.threadFactory = threadFactory;
// 饱和策略,默认为AbortPolicy
this.handler = handler;
}
ThreadPoolExecutor由多个重载构造方法,从该方法的源码可知,它的作用就是使用传入的参数进行赋值,当然,需要注意的是,对于饱和策略默认使用的是AbortPolicy,即如果当前线程数达到最大值时,直接抛出RejectedExecutionException异常。AbortPolicy类源码如下:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
当线程池创建完毕后,接着,我们分析下线程池是如何完成提交、执行任务(Runnable)的,其中提交任务
这个过程通过调用ThreadPoolExecutor的execute
方法实现。该方法源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果当前工作线程数[workerCountOf(c)]小于corePoolSize
// 则创建一个新的线程执行任务,如果成功,直接返回
// 否则,再次调用ctl.get()以便再次需要用ctl做判断
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 线程池处于Running状态[isRunning(c)],且成功把任务添加到任务队列中
// 再次判断线程池处理非Running状态且任务从队列中删除,拒绝执行任务
// 否则,判断worker线程数量是否为0。
// 如果不为0,则execute返回;
// 如果为0,调用addWorker添加一个没有初始任务的worker线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果任务队列已满但当前工作线程数小于maxPoolSize,则尝试创建一个非核心创建来处理任务
// 如果失败,则拒绝执行任务,具体拒绝行为由饱和策略决定
else if (!addWorker(command, false))
reject(command);
}
从execute()方法源码可知,它的主要工作就是提交任务,分为三种情形:
(1)如果当前工作线程小于corePoolSize
,则创建新线程执行任务;
(2)如果当前工作线程大于corePoolSize
,则将任务添加到任务队列中;
(3)如果当前工作线程大于corePoolSize
且任务队列已满
,则创建非核心线程执行任务,如果线程数已经大于maxPoolSize
,则执行饱和策略,默认抛出RejectedExecutionException
异常。
接下来,我们看addWorker()
方法是如何添加worker线程的。
/** 添加worker线程
* firstTask:待提交的任务
* core:是否为核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 1. 获取线程池的状态,判断是否可添加worker线程
// 如果可添加,则worker数量+1
for (;;) {
int c = ctl.get();
// rs表示线程池状态
int rs = runStateOf(c);
// 判断线程池是否为可添加worker线程状态
// 如果为否,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 当前工作线程数
int wc = workerCountOf(c);
// 如果当前线程数大于边界值[CAPACITY=(2^29)-1]
// 或者大于核心线程数corePoolSize(core=true时)
// 大于最大线程数maximumPoolSize(core=false时),直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试CAS操作对worker数量+1
// 如果成功,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取ctl的值
c = ctl.get();
// 再次判断线程池状态是否变化
if (runStateOf(c) != rs)
continue retry;
}
}
// 2. 创建worker线程,使用重入锁,即线程安全
// 然后启动该worker线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 对线程池上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 线程池为“可添加”worker线程状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 往线程池workers HashSet集合中
// 添加新创建的worker线程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 线程池解锁
mainLock.unlock();
}
// 启动worker线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动worker线程失败(workerStarted=false)
// 执行addWorkerFailed方法,即将worker从worker集合中移除
// 且worker数量-1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从addWorker方法
源码可知,它的作用是根据线程池的状态(running状态
)和任务状态(firstTask!=null
)创建一个新的worker线程并将任务作为参数传入,然后再将这个创建的worker线程添加到一个HashSet集合中。接下来,我们就分析下内部类Worker
的实现原理。该内部类源码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//利用ThreadFactory和 Worker这个Runnable创建的线程对象
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
// 将AQS的state置为-1,在runWoker()前不允许中断
setState(-1);
// 待执行的任务会以参数传入,并赋予firstTask
this.firstTask = firstTask;
// 用Worker这个Runnable创建Thread
this.thread = getThreadFactory().newThread(this);
}
public void run() {
//runWorker()是ThreadPoolExecutor的方法
runWorker(this);
}
// The value 0 represents the unlocked state. 0代表“没被锁定”状态
// The value 1 represents the locked state. 1代表“锁定”状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试获取锁
* 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 尝试释放锁
* 不是state-1,而是置为0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中断(如果运行)
* shutdownNow时会循环对worker线程执行
* 且不需要获取worker锁,即使在worker运行时也可以中断
*/
void interruptIfStarted() {
Thread t;
//如果state>=0、t!=null、且t没有被中断
//new Worker()时state==-1,说明不能中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
从Wokrer内部类源码可知,Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务
,又可以达到锁的效果
。之所以Worker自己实现Runnable,并创建Thread,在firstTask外包一层,是因为要通过Worker控制中断,而firstTask这个工作任务只是负责执行业务。由于worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,而是根据当前worker创建一个线程对象,当调用Thread的start方法(worker线程启动
)后当前worker.run()被调用,进而firstTask.run()被调用。因此,接下来就看下runWorker
是如何执行任务的,即如何执行firstTask.run()。该方法源码如下所示。
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果传入任务不为空,则执行任务,即调用firstTask.run()
// 如果传入任务为空或者任务执行完毕,则通过getTask方法从任务队列中取出任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 当前任务执行完毕
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 当没有任务可执行时,worker线程退出
processWorkerExit(w, completedAbruptly);
}
}
runWorker(Worker w)
执行流程如下:
(1)执行任务之前,首先调用Worker的unlocker()方法将state设置为0,即允许中断当前worker线程;
(2)开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断。另外,在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法;
(3)无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程;
(4)如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程。
2.2 手写线程池
/** 自定义线程池
* author : jiangdg
* date : 2020/2/1 16:40
* desc : 本例只是线程池的简单实现,即只有核心线程情况且不考虑任务溢出策略
* version: 1.0
*/
public class SimpleThreadPool {
// 阻塞队列,放置任务
private BlockingQueue<Runnable> mTaskQueue;
// 工作线程
private List<WorkerThread> mThreadList = new ArrayList();
public SimpleThreadPool(int poolSize, BlockingQueue<Runnable> taskQueue) {
this.mTaskQueue = taskQueue;
for(int i=0; i<poolSize; i++) {
WorkerThread thread = new WorkerThread();
thread.start();
mThreadList.add(thread);
}
}
public void execute(Runnable task) {
if(mTaskQueue == null)
throw new NullPointerException("taskQueue can not be null.");
// 向队列中添加任务
// 如果队列已满,则阻塞调用execute的线程直到有空间
try {
mTaskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected class WorkerThread extends Thread {
@Override
public void run() {
super.run();
// 当前线程无限循环从队列取任务
// 如果队列为空,则阻塞当前线程直接有任务
while (true) {
if(mTaskQueue == null)
throw new NullPointerException("taskQueue can not be null.");
try {
Runnable task = mTaskQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}