CAS只是一种思想,就是比较并替换。它有三个操作数:内存值V,旧的预期值A,要修改的新值B。只有当预期值A和内存值V相同的时候,才将内存值V修改为新值B,并返回ture。否则什么都不做,返回false。CAS一定要与volatile变量配合使用,这样才能保证线程每次拿到的变量都是主内存中最新的那个值,否则,旧的预期值A对某个线程来说,可能永远是一个不会变的值A。
实现:Actomic原子类就是使用的CAS机制。例如i++操作,假如i的初始值为0,线程1来尝试修改i的值,从0修改到1。首先线程1会用旧的预期值0与内存值i(此时也等于0)作比较,比较结果一致,修改i的值为新值1,i的值变为1。此时又来了一个线程2也尝试修改i的值,从0修改到1,首先线程2也会用旧的预期值0与内存值i(此时也等于1)作比较,比较结果不一致,什么都不做,直接返回false,表示修改失败。
优点:CAS操作是抱着乐观态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
缺点:CAS存在ABA问题,比如一个人账户里有100块钱,他想取50但是银行出现错误,同时执行了两次减少50的操作,但他自己只发出了一次指令。当第一次执行的时候采用CAS机制取出50块钱,但他的妈妈从别的操作系统给他又转了50块,这样账户里的钱又是100块,那么采用CAS机制就会继续执行扣款操作,这样就是ABA问题,丢了50块钱。
解决方案:ABA问题可以对每个值添加一个版本号来判断。
CAS优化:
比如AtomicInteger是通过无限不停地循环去修改value,直到修改成功为止。但是在并发数非常高的情况下,可能有很多线程会不停的自旋,进入一个无限重复的循环中。这些线程不停地获取值,然后发起CAS操作,但是发现这个值被别人改过了,于是再次进入下一个循环,获取值,发起CAS操作又失败了,再次进入下一个循环。导致大量线程空循环,自旋转,性能和效率都不是特别好。
于是在JDK8之后出现了LongAdder,其父类是Striped64,它就是尝试使用分段CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性能!
在LongAdder的底层实现中,有一个base变量,首先多个线程会CAS修改这个base变量,对其进行累加操作。但是,一旦线程并发的数量过高,就会采取分段CAS机制,也就是内部会搞一个Cell数组,每个数组单元都是一个数值分段。这时,让大量的线程分别去对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到了不同的Cell分段数值中了!这样就可以大幅度的降低多线程并发更新同一个数值时出现的无限循环的问题,大幅度提升了多线程并发更新数值的性能和效率!
而且他内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。这样也解决了线程空旋转、自旋不停等待执行CAS操作的问题,让一个线程过来执行CAS时可以尽快的完成这个操作。
最后,如果你要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回给你。
分段CAS核心源码:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) { //获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
ThreadLocalRandom.current(); //初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) { //cas经典无限循环,不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells不为null,并且数组size大于0
//表示cells已经初始化了
if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
//当cellsBusy为0时,表示当前可以对cells数组进行操作。
Cell r = new Cell(x);//将x值直接赋值给Cell对象
if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
//就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
//如果失败了,就会再次执行一次循环
boolean created = false;
try {
Cell[] rs; int m, j;
//判断cells是否已经初始化,并且要操作的位置上没有cell对象.
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
created = true;
}
} finally {
//经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
//将cellBusy设置为0就是释放锁.
cellsBusy = 0;
}
if (created)
break; //如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
continue;
}
}
collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
//就直接返回
break;
else if (n >= NCPU || cells != as)
//如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
//再次获得cellsBusy这个spinLock,对数组进行resize
try {
if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
Cell[] rs = new Cell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;//赋予cells一个新的数组对象
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//cells数组未初始化,获得cellsBusy lock,来初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x); //设置x的值为cell对象的value值
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}//如果初始化数组失败了,那就再次尝试一下直接cas base变量,如果成功了就直接返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}