前面介绍了原子类java多线程之CAS操作AtomicXXX原子操作类,ABA问题以及原子更新字段类详解,接下来我们看看JDK8中新增的LongAdder
LongAdder
1.思想
-
无竞争只更新base 的值
-
有竞争时,采用分段思想,每个线程都映射到不同的Cell中去更新
最终结果是base+各个子cell的值
因此当竞争激烈时,对一个值进行修改,冲突的概率很高,需要不断CAS,导致大量的时间在自旋,但如果采用分段思想将一个值分为多个值,分散压力,性能就会变高。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
2.源码分析
public class LongAdder extends Striped64 implements Serializable {
public LongAdder() {
}
}
可以看到LongAdder
继承了Striped64
LongAdder中的 base
、Cell
类、longAccumulate
方法都是在Striped64
中定义。
2.1 Striped64
属性和内部类
//使用Contended注解修饰,解决了value的伪共享问题
@sun.misc.Contended static final class Cell {
//volatile修饰的值,就是要存储的long类型的值
volatile long value;
Cell(long x) { value = x; }
//CAS更新value
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//value的内存偏移地址
private static final long valueOffset;
static {
try {
//获取unsafe实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
//将value的内存偏移地址保存到valueOffset
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/**CPUS的数量,要限制表大小 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* 当存在过竞争后,每个Cell数组每个元素存储各个段位的值
* 如果为非空,说明存在过竞争,且大小为2的幂,
*/
transient volatile Cell[] cells;
/**
* 基本值,主要在没有争用时使用,也用作表初始化过程中的后备。通过CAS更新。
*/
transient volatile long base;
/**
* 用于初始化和调整表的大小或创建单元时使用的自旋锁(通过CAS锁定)。
无需阻塞锁;当锁不可用时,线程会尝试其他cell或base。
*/
transient volatile int cellsBusy;
几个属性的内存偏移地址:
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
PROBE
通过ThreadLocalRandom维护的“线程”探针字段用作每个线程的哈希码。
LongAdder方法
add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null //1.cells数组非空,说明存在多线程竞争
|| !casBase(b = base, b + x)) {//2.如果cells为空,则就cas更新base值,如果失败,说明base已经被另外的线程动过
//3.字面义,为true说明没有竞争,为false就是多个线程在竞争操作
boolean uncontended = true;
//满足if的条件,都要调用longAccumulate处理
if (as == null //4. cells为空,即上面cas更新base失败
|| (m = as.length - 1) < 0 //5.celss长度小于1,emm跟4差不多意思
||(a = as[getProbe() & m]) == null ||//6.当前线程维护的cell为空
!(uncontended = a.cas(v = a.value, v + x)))//7.cas更新当前线程维护的cell的值,如果更新失败,即uncontented为false,说明有多个线程在竞争
longAccumulate(x, null, uncontended);
}
}
getProbe()
获取当前线程的hash码,就是该线程要维护Cell数组的哪个元素。
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
longAccumulate()
该方法在Striped64
中
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
/*
类注释中有说明,若当前cell的probe没有初始化时,为0值,就要将其初始化为尽量不与其他对象冲突的值
*/
if ((h = getProbe()) == 0) {
//调用ThreadLocalRandom初始化
ThreadLocalRandom.current(); // force initialization
//初始化后重新获取probe
h = getProbe();
//probe是0,那就说明是没有竞争,置为true
wasUncontended = true;
}
//
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {//cells已经初始化
if ((a = as[(n - 1) & h]) == null) {//当前线程维护的cell段未初始化
if (cellsBusy == 0) { // 为0时,说明没有线程获取该锁去创建cell或者在扩容cells,则以要增加的值x作为当前cell维护的值,尝试创建新的cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 &&
casCellsBusy()) {//cellBusy为0时,尝试CAS更新其为1,即尝试获取该自选锁
//是否成功创建cell
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&//cells非空
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//且当前cell为空
rs[j] = r;//将新的cell让在该位置
created = true;//创建成功
}
} finally {
//重置为0,释放该自选所
cellsBusy = 0;
}
//如果创建成功,就break退出循环
if (created)
break;
//如果创建失败,即当前cell非空则继续
continue;
}
}
//没有出现竞争,不需要扩容
collide = false;
}
else if (!wasUncontended) // 前面add方法cas更新value失败
wasUncontended = true; // 重置为true,后面会重hash,做简单自旋
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))//再次对当前线程维护的cell更新值,作累加,CAS成功就退出
break;
else if (n >= NCPU //cells数组长度超过了CPU核数
|| cells != as)//或cells数组已经扩容
collide = false; //置为false,表明不要扩容 ,就不会走到下面那个else if
else if (!collide)//cas失败且没有满足上面的条件,说明有竞争
collide = true;//置为true,这样下次就可以直接进入下一个else if进行扩容
else if (cellsBusy == 0 && casCellsBusy()) {//尝试获取锁
try {
if (cells == as) { // 如果还没有其他线程扩容,则进行扩容2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
//竞争解决
collide = false;
continue; // Retry with expanded table
}
//更新probe值
h = advanceProbe(h);
}
//cells为空,因此要初始化数组,cas获取cellsBusy锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//是否初始化成功
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
//初始化成功
init = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
//初始化成功则退出
if (init)
break;
}
//CAS获取cellsBusy锁失败,说明其他线程在更新当前线程维护的cell或扩容cells,则cas更新base值,更新成功则退出
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
流程:
- 如果add中cas更新base失败,说明存在竞争,则在该方法中先通过probe判断当前线程的hash是否为0,如果为0则强制初始化probe;此时cells为空,尝试获取锁对cells进行初始化,并将新值的x放在该cell中,退出
- 如果初始化为cas获取
cellsBusy
锁失败,说明有线程在创建cell或进行初始化,则尝试更新base
,如果更新成功则退出,否则继续循环。 - 如果cells数组已经初始化,若当前线程维护的cell未初始化,则尝试获取锁创建该cell对象,值为新增的x,如果创建成功则退出;否则其他线程已经创建,继续循环
- 否则如果当前线程维护的cell已经创建,则CAS更新,如果成功则退出;否则说明有多个线程在竞争,对当前线程重hash后继续
- 4失败后会再进行一次CAS更新,如果还是失败,就会进行尝试获取锁扩容(上一次已经将扩容标识置为true),扩容大小时2倍(n << 1),扩容成功会重新计算线程的hash,再次进行CAS更新