CAS(Compare And Swap):
CAS(比较与交换,Compare and swap) 是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA32、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是项 乐观锁 技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。在JAVA中,sun.misc.Unsafe
类提供了硬件级别的原子操作来实现这个CAS。 java.util.concurrent
包下的大量类都使用了这个 Unsafe.java
类的CAS操作。
java.util.concurrent.atomic中的AtomicXXX(eg. AtomicInteger.java
,AtomicBoolean
,AtomicLong
),都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类,这些原子变量都调用了 sun.misc.Unsafe 类库里面的 CAS算法,用CPU指令来实现无锁自增。
下面以 AtomicInteger.java
的部分实现来大致讲解下这些原子类的实现,JDK源码:
// 获取当前值 public final int get() { return value; } // 设置值为 newValue public final void set(int newValue) { value = newValue; } //返回旧值,并设置新值为 newValue public final int getAndSet(int newValue) { /** * 这里使用for循环不断通过CAS操作来设置新值 * CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系 * */ for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } } // 原子的设置新值为update, expect为期望的当前的值 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // 获取当前值current,并设置新值为current+1 public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } }
我们可以用这种方法替代锁实现多线程计数器问题。
但是在多线程下会出现ABA的问题:引用网友的图片说明ABA问题:
AQS(AbstractQueuedSynchronizer):
AQS(AbstractQueuedSynchronizer),即队列同步器,AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),如果你有看过类似 CountDownLatch 类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer 的内部类 Sync 。可见 CountDownLatch 是基于AQS框架来实现的一个同步器。类似的同步器在JUC下还有不少。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
如JDK的文档中所说,使用AQS来实现一个同步器需要覆盖实现如下几个方法,并且使用getState,setState,compareAndSetState这几个方法来设置获取状态
boolean tryAcquire(int arg) boolean tryRelease(int arg) int tryAcquireShared(int arg) boolean tryReleaseShared(int arg) boolean isHeldExclusively()
以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively而支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。下面以 CountDownLatch 举例说明基于AQS实现同步器, CountDownLatch 用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为0时,解除所有线程的等待;await调用acquire,如果计数器为0,acquire 会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景。源码如下:
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } //重写 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //重写 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } /** * Returns the current count. * * <p>This method is typically used for debugging and testing purposes. * * @return the current count */ public long getCount() { return sync.getCount(); } /** * Returns a string identifying this latch, as well as its state. * The state, in brackets, includes the String {@code "Count ="} * followed by the current count. * * @return a string identifying this latch, as well as its state */ public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。
AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。
用大白话来说,AQS就是基于CLH队列,用volatile修饰共享变量state,线程通过CAS去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。
CLH结构如下:
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
AbstractQueuedSynchronizer类方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //子类方法 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }