前言
最近在看 Java 并发编程的相关源码。发现了一个词 —— Treiber stack。这篇文章介绍一下什么是 Treiber stack 以及其核心算法 CAS。
比如 FutureTask 中:
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
一、Treiber stack 的由来
Treiber Stack在 R. Kent Treiber在1986年的论文 Systems Programming: Coping with Parallelism (系统编程:并行拷贝)中首次出现。它是一种无锁并发栈,其无锁的特性是基于 CAS(Compare and swap:比较再交换)算法所实现的原子操作实现的。
二、CAS
1. 什么是CAS
CAS:Compare and Swap,即比较再交换。是 乐观锁 的一种 无锁实现 算法。
算法涉及到三个操作数:
- 需要读写的内存位置V
- 需要进行比较的预期值A
- 需要写入的新值U
CAS算法解析:CAS具体执行时,当且仅当预期值A符合内存地址V中存储的值时,就用新值U替换掉旧值,并写入到内存地址V中。否则不做更新。
过程如下图:
2. Java 中的 CAS
在 Java15 并发编程中,有一些基于基础数据类型的 CAS 实现,在 java.util.concurrent.atomic.* 下。这里以 AtomicBoolean 为例:
public class AtomicBoolean implements java.io.Serializable {
private static final long serialVersionUID = 4654671469794556979L;
//VarHandle 之后会写一篇文章对比 Unsafe 的访问方法
//可以先粗浅的理解为,获取到了 AtomicBoolean 实例的变量。
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(AtomicBoolean.class, "value", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return VALUE.compareAndSet(this,
(expectedValue ? 1 : 0),
(newValue ? 1 : 0));
}
}
跳转至 VarHandle 的类中:
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet(Object... args);
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean weakCompareAndSet(Object... args);
我们发现是个 native 方法,也就是说 Java 在外部实现的底层操作。具体的 c 实现,之后在探究。
3. CAS 的问题
- ABA问题
- 线程1、线程2取得主内存值 A ,分别拷贝到自己的内存段中,执行计算;
- 在线程2计算过程中,线程1率先完成计算并将更新的值 B 更新到主内存;
- 在线程2计算过程中,加入线程3;
- 在线程2计算过程中,线程3拷贝主内存值 B,并完成计算将更新的值 A 更新至主内存;
- 此时线程2完成计算,根据 CAS 算法,线程2将更新的值 C 更新到主内存;
此问题会导致,强数据意义的业务场景,数据错误。
例如:两个用户 A、B 在电商平台上抢一台 iphone,用户 A 抢到了,用户 B 网不好实际没有抢到,但是在此时用户 C 退货了一台。最后结果就是用户A、B都抢到了iphone,但实际的业务意义与数据却出现了偏差。
- 长时间线程自旋昂贵
如果CPU(单核)没有抢占式调度器(即通过时钟中断一个线程,运行其他线程),线程自旋是不会释放CPU资源的,会一直访问目标内存段。短时间线程自旋是一个相当划算的选择,但如果多个线程长时间自旋,会对 CPU 造成负担。
三、FutureTask 的无锁并发栈实现
回到最开始:
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
我们先来看一下 WaitNode 是个啥:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread(); }
}
WaitNode 是 FutureTask 中的一个内部类,结构来看是一个 单向链表 。
我们再来检索一下,waiters变量的使用函数,先暂时不纠结其业务意义是什么,既然是一个栈,那就看一下最关键的几个步骤:
- 节点入栈,指针指向顶部节点:
q入栈时,要检查 next 指针是否指向 waiters ,如果是其他节点,证明线程不安全了(被篡改),queued = false,并下次循环重新尝试。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//……………………
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
//……………………
}
- 节点出栈,指针指向顶部节点:
q出栈时,检查当前 waiters 是否是 q,如不是证明线程不安全(被篡改),自旋阻塞等待。
private void removeWaiter(WaitNode node) {
//……………………
s = q.next;
//……………………
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
//……………………
}
- 栈清空:
直接指针指向 null,gc会回收在内存中飘荡的 waiter 链表。
private void finishCompletion() {
//……………………
if (WAITERS.weakCompareAndSet(this, q, null)) {
//……………………
}
//……………………
}
以上,就是无锁并发栈的实现,核心为 CAS 算法操作的栈实现,仔细体会。
关于 compareAndSet 与 weakCompareAndSet 的区别,可以看我的下一篇文章。