线程拾遗篇
前言
这一篇主要是对之前多线程源码中遗漏的部分进行一个补充,主要包括:
Callable
和FutureTask
condition
CountDownLatch(闭锁)
CyclicBarrier(栅栏)
Semaphore(信号量)
7月20日更新:
volatile
synchronized
final
有返回值的线程
Callable
之前在学习线程池的时候我们发现ThreadPoolExecutor
本身并没有submit
方法,而是在抽象类中采用了适配器模式,将Runnable
转化为了Callable
接口。
我们先来看Callable
接口,问我把它和Runnable
接口放在一起以便做一个比较。
可以看到run
是一个虚方法,是没有返回值的,但是call
方法不但有返回值,还能够抛出异常。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
FutureTask
我们直接来看Callable
的应用——FutureTask
。
继承和接口
还是依照惯例先来看继承和接口,FutureTask
实现了RunnableFuture
接口,而这个接口又继承于Runnable
和Future<V>
。那么这个Future<V>
是什么呢?
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
总的来说Future是一个包装类,里面保存了线程执行的结果和状态,调用get方法会阻塞至线程退出(执行完毕、异常)或者超时。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
成员变量
成员变量和Thread
都有状态
//线程状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 底层调用的Callable,运行结束后清空 */
private Callable<V> callable;
/** 返回结果*/
private Object outcome; // non-volatile, protected by state reads/writes
/** 当前执行线程的引用,用来取消或者中断 */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
构造函数
FutureTask
不但可以传入callable
也可以传入runnable
,如果传入的是后者的话会通过Executors.callable
方法进行转化
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
执行器里的方法就是我们之前所说的适配器模式,适配器继承了Callable
接口,使用适配器模式需要把返回的对象也一并传入进去。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
使用
代码来源:https://blog.csdn.net/qq_40685275/article/details/99838677
public static void main(String[] args){
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
Thread thread = new Thread(futureTask);
thread.setName("Task thread");
thread.start();
int result = 0;
try {
// 5. 调用get()方法获取任务结果,如果任务没有执行完成则阻塞等待
result = futureTask.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("result is " + result);
}
// 1. 继承Callable接口,实现call()方法,泛型参数为要返回的类型
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int result = 0;
for(int i = 0; i < 100;++i) {
result += i;
}
Thread.sleep(3000);
return result;
}
Condition
Condition
本质上是一个接口,我们直接来看接口中的方法。
public interface Condition {
//使当前线程加入 await() 等待队列中,并释放当锁,当其他线程调用signal()会重新请求锁。与Object.wait()类似。
void await() throws InterruptedException;
//调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。
//调用该方法后,结束等待的唯一方法是其它线程调用该条件对象的signal()或signalALL()方法。等待过程中如果当前线程被中断,该方法仍然会继续等待,同时保留该线程的中断状态。
void awaitUninterruptibly();
// 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。
//nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
//若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;若指定时间内未收到通知,则返回0或负数。
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个在 await()等待队列中的线程。与Object.notify()相似
void signal();
void signalAll();
}
一般来说Condition
需要配合Lock
使用,一般Object.wait()
适用于监听对象,而Condition.await()
适用于AQS同步。
代码来源:https://www.cnblogs.com/gemine/p/9039012.html
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionUseCase {
public Lock lock = new ReentrantLock();
public Condition condition = lock.newCondition();
public static void main(String[] args) {
ConditionUseCase useCase = new ConditionUseCase();
ExecutorService executorService = Executors.newFixedThreadPool (2);
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionWait();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionSignal();
}
});
}
public void conditionWait() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "等待信号");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信号");
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void conditionSignal() {
lock.lock();
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "拿到锁了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "发出信号");
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
同步工具
CountDownLatch
CountDownLatch
的代码非常短,原理也非常简单,所以我们大致的粗看一下。
与ReentrantLock
类似CountDownLatch
在内部也实现了一个sync
的同步器,但是主要使用的是共享模式
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
//尝试获取共享锁
return (getState() == 0) ? 1 : -1;
}
//尝试释放锁,当减为0的时候会调用AQS中的doRelease方法
//在doRelease方法中会唤醒队列头部节点
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
构造方法中仅仅就是对Sync
同步器进行了初始化,设置了一个不为0的state
。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
再来看await()
和countDown()
方法:
public void await() throws InterruptedException {
//尝试获取共享锁,如果被中断则抛出异常
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
到了这里就不难猜出CountDownLatch
的实现原理:
- 初始化的时候设置
state
为正整数 - 当调用
await()
方法的时候发现state
不为0,无法获得共享锁,就加入节等待队列。 - 每次调用
countDown()
方法都会通过CAS减少state
,当state
为0的时候,唤醒头部节点。 - 头部节点发现自己是一个共享节点,向后传递唤醒操作,队列中的节点全部被唤醒。
CyclicBarrier
CyclicBarrier
不同于之前说过的同步工具,其内部并没有实现一个Sync
的同步队列,而是通过可重入锁ReentrantLock
来实现的。
成员变量
private final ReentrantLock lock = new ReentrantLock();
//条件唤醒
private final Condition trip = lock.newCondition();
//参与线程的总数
private final int parties;
//如果想要在释放所有线程之前做些什么,可以传入这个参数
private final Runnable barrierCommand;
//指示器,用来指示是否需要破坏栅栏
private Generation generation = new Generation();
//还需要等待的线程数
private int count;
构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
await
CyclicBarrier
是让所有线程调用await()
方法,调用的线程会被阻塞在那边,当所有的线程都到达了await()
方法之后才会继续执行。
所以我们直接来看他的await()
方法,await()
方法的只是调用了dowait()
方法。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//获取锁
lock.lock();
try {
final Generation g = generation;
//打破栅栏
if (g.broken)
throw new BrokenBarrierException();
//如果一个线程被中断,那么释放其他的线程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//被调用说明一个线程达到了await方法处
int index = --count;
//如果需要等待的线程数为0,释放所有线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒其他线程,重置状态
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 否则进行循环等待
for (;;) {
try {
if (!timed)
//阻塞,等待唤醒
//这个时候会让出锁
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
所以CyclicBarrier
的原理也不难理解,通过可重入锁来确保栅栏同时只有一个线程能够持有,当线程发现自己不是最后一个线程的时候会调用await()
方法,让出锁,等到所有线程都到达之后采用Condition.signalAll()
来唤醒所有的等待线程。
Semaphore
内部类
由于Semaphore
的成员变量只有一个Sync
对象,所以我们直接来看它是如何实现同步器的。
可以看到信号量也是一个基于共享锁的同步器。
与之前学习过的读写锁不同,在信号量中,state
代表还允许多少线程进入,而非当前有多少线程持有锁。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//设置资源
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//尝试获取资源,如果剩余资源数小于0,会进入AQS
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
构造方法
信号量也存在公平和非公平两种方式。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
通过上面的代码可以总结一下信号量的原理:设置一个状态用于标志还可以进入的线程数,如果剩余数量小于0则进入AQS中等待。
当释放一个资源的时候,由于是共享锁,会唤醒队列中所有等待的线程,线程再次竞争,没有获取到资源的继续等待。
关键字
这两个关键字也是面试中的常客,但是遗憾的是他们没有源码可以看。
所以就只好从《Java并发编程的艺术》这本书上面摘抄一段从底层理解这两个关键字的话。
volatile
volatile在内存中的实现主要是内存屏障,通过内屏障会禁止指令重排序,通过happens-before原则确定两个操作之间的先后顺序。但是它不能保证原子性,如果执行i++
的操作,即使使用volatile修饰,还是有可能另外一个线程的读操作发生在这个线程的自增操作之前。
而在CPU层面的实现就要通过CPU的缓存、嗅探来说。
对于CPU来说,线程是最小的调度单位,每个线程都运行在一个核心上,每个核心有自己的运算器和寄存器以及L1、L2级缓存。
当核心想要执行一条指令的时候,首先要获得两个需要进行运算的变量,CPU会从三级缓存逐级找下去,如果都不在就去内存中读取数据,然后缓存在三级缓存中(对于intel CPU来说,L1、L2缓存是一个核心独占的,L3级缓存是一个CPU中的核心共享的)。
volatile在转化为指令的时候会被加上LOCK指令,在最近的CPU中是锁住缓存行,在修改完成之后会写回内存。当缓存行被修改之后会通过MESI(intel CPU,缓存一致协议),告知其他核心的嗅探该缓存行已经失效,需要从内存中重新读取。
除此之外,还有一个有趣的内容是缓存总是一行一行读取的,一行缓存是64个字节,所以当两个4字节的变量处在同一个缓存行中的时候,其中一个volatile变量被修改会导致整个缓存行无效。
所以针对这种情况,java中有一个叫做LinkedTransferQueue
的并发队列,它对每一个变量都进行了填充,使得一个变量可以填充满一整行缓存,这样一个变量发生变化的时候不会影响其他缓存行中的变量,使得并发操作快上很多。
synchronized
synchronized关键字的底层实现是由JVM完成的,根据JVM规范,JVM基于进入和退出Monitor对象来实现方法和代码块的同步。
代码块是通过monitorenter
和monitorenter
两条指令实现的,而方法同步的具体方式在JVM规范中没有说明。
JVM要保证每一个monitorenter
指令都有一个monitorexit
指令对应,当一个线程运行到monitorenter
之后就会尝试获取目标对象的锁。
通过synchronized可以实现多线程中的原子性,一个操作不会被其他操作打断。
原子性的另外一个实现方式是通过CAS,Java中的CAS是通过处理器提供的CMPXCHG指令实现的。
偏向锁
不是锁 是标签,把自己的线程ID号存放在对象头中
对象内存结构
- markWord 8字节
- 类型指针,指向对象的类 4字节
- 实例数据 取决于成员变量,无成员 0字节
- 对齐 补充为8的倍数
所以一个Object对象为16字节
markWord中最重要的就是锁信息。
偏向锁有一个启动延时,默认为4000ms,在JVM刚启动的过程中是不会给对象上偏向锁的,因为这个时候产生的对象大多是需要多线程竞争的。
为什么要有延时?
偏向锁在明确知道是多线程的时候反而会导致性能下降
JVM启动过程中一定是多线程竞争的
偏向锁启动之后会有一个匿名偏向的概念
只有偏向锁和无锁状态下对象才有分代年龄,一共4位,cms默认最大值为6,其他一般为15,当升级成轻量级和重量级锁之后就没分代年龄信息了,为什么?因为synchronized持有的对象是GC root。
markWord中一共有多种锁状态,当锁状态为11的时候说明是CMS过程中标记信息。
偏向锁降级
轻量级锁
自旋
为什么线程多的时候要改成重量级锁?
自旋切换上下文消耗太大
重量级锁
OS维护队列
final
final可以用来修饰成员变量和方法,修饰方法的时候表示这个方法不能被继承和覆盖,但是可以重载。
这里主要说final修饰成员变量时候的作用。
如果final修饰的是一个在编译期就可以确定的成员变量,比如final i = 1;
,那么这个成员变量就是一个编译期常量,但是如果是final i = random.nextInt();
那么必须要等到运行的时候才能确定这个变量的值。
如果一个成员变量同时使用了final和static进行修饰那么,这个成员变量必须在一开始就初始化,否则编译期不予通过。
对于final来说有两种禁止重排序的情况,先来说禁止写final域的重排序。
禁止写排序的作用主要是:禁止final域的写操作被重排序在构造函数之外,这个操作是是通过内存屏障实现的。
对于禁止读final域的重排序:final指令禁止所有读final域的操作重排序到读对象的引用之前。
CAS原理
CAS的全称是Compare-And-Swap,直译就是比较并交换。这是一条CPU指令,JVM只是封装了汇编调用,AtomicInteger之类的原子类就是使用了这些分装后的接口(UnSafe类)。
CAS的优势在于当并发竞争较少的时候速度较快,消耗资源较少,但是他也不是没有任何问题的。
ABA问题
CAS中存在一个ABA问题,当一个值原来是A,变成了B,最后又变回A,那么使用CAS检测的时候会认为它没有变化并且更新,这就可能会带来一些错误。
解决这个问题的方法也很简单,就是给每个值加上版本号,比较的时候比较版本号而非真实的值。
循环开销大
如果CAS长时间不成功就会一直自旋浪费CPU资源,后来的JVM增加了自适应自旋锁的功能,会根据等待时间来调整自旋次数。
多变量问题
当使用CAS更新一个变量的时候我们可以保证这个变量更新的原子性,但是如果涉及到多个变量就无法保证了,一般采用的方式就是把多个共享变量合并成一个变量来操作。
UnSafe类
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。 这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。
著作权归https://www.pdai.tech所有。
链接:https://www.pdai.tech/md/java/thread/java-thread-x-juc-AtomicInteger.html
Unsafe方法除了提供了CAS的方法,也提供了一系列修改内存地址中的内容从而直接修改对象字段的功能。
LockSupport
LockSupport是用来创建锁和其他同步线程的基本线程阻塞原语。简而言之,当调用LockSupport.park
时,表示当前线程会等待直到获得许可,当调用LockSupport.unpark()
方法的时候,好让此线程继续运行。
LockSupport和wait/notify的区别在于,如果先调用notify再调用wait,会导致这个对象永远被锁定,但是先调用unpark再调用park仍然可以释放线程。除此之外,线程中断也会引发LockSupport的释放。
Condition.await()底层就是调用了LockSupport.park()来实现阻塞当前线程的。
后记
这两天先对之前的一些知识点进行查漏补缺,对于一些比较常问的知识点一定要深入到其底层实现,这样才能在面试的时候占得先机。
在字节面试完之后不论如何都要进行spring的源码学习了。