先谈几个大家熟悉的,java.util.concurrent包中的ReentrantLock、CountDownLatch、Semaphore、CyclicBarrier,这几个类都是通过AQS来实现的,先学习了AQS再回头看看这几个类。
AQS是什么?它的数据结构是怎样的?
AQS全名AbstractQueuedSynchronizer,翻译过来是抽象队列式同步器,队列、同步,在我们学习锁的时候是必然接触到的两个字眼,那么它的数据结构是怎样的呢?先看看它的内部类Node,里面的注释非常清晰了,只不过是我们不太擅长的英文,我在里面简单的注释一下。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();//AQS有两个模式,共享模式和独占模式
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;//这三个值用来标识线程的等待状态
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;//每个节点维护一个内部的线程,把外部的线程传入Node并对这个线程进行操作:阻塞、启动
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
看完这个内部类,我们再看看AQS的运作模型是什么样子的,为什么每个节点需要维护这些内容
这所谓的需要重写的重要方法有:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {//该线程是否正在独占资源。只有用到condition才需要去实现它。
throw new UnsupportedOperationException();
}
可以看到方法体里面的内容是直接抛出异常,因为我们没办法直接使用AQS,而是需要实现自定义同步器,重写这些方法。
AQS的重要方法
获取资源的操作最外层的方法acquire
public final void acquire(int arg) {//这里是独占模式的acquire方法
if (!tryAcquire(arg) &&//尝试获取资源失败
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//将节点加入到等待队列并在条件允许时获取资源或阻塞等待
selfInterrupt();
}
按执行顺序,继续分解里面的方法tryAcquire、addWaiter和acquireQueued
protected boolean tryAcquire(int arg) {//需要自定义的方法,尝试获取资源的方法
throw new UnsupportedOperationException();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//cas加入队列尾
pred.next = node;
return node;
}
}
enq(node);//如果队列为空,初始化队列并加入node
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {//如果已经初始化,cas加入队列尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//用来返回给acquire,如果是中断状态,令队列线程中断
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//如果该节点的前置节点是头节点(因为终于排到我了啊!!),尝试获取资源
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//获取失败后是否应该park掉该线程
parkAndCheckInterrupt())//park掉该线程,并检查队列线程的interrupt标志位,若interrupt为true,设置interrupted为true
interrupted = true;//这里的操作我的理解是队列线程本身就处于阻塞状态,没有执行,所以暂时关闭中断在获取资源成功后再开启中断
}
} finally {
if (failed)//如果尝试获取资源失败,取消该线程的获取操作
cancelAcquire(node);
}
}
再看看release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {//尝试释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//取消park后续节点,即让后一个节点开始申请资源
return true;
}
return false;
}
上面都是独占模式下的acquire和release,共享模式下的acquireShared和releaseShared也有很多共同点,就不细说了。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//多个线程尝试获取成功
doAcquireShared(arg);//多个线程获取资源
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//多个线程尝试释放成功
doReleaseShared();//多个线程释放资源
return true;
}
return false;
}
ReentrantLock的实现原理及源码阅读
直接看lock()
public void lock() {
sync.lock();
}
使用的是sync内部类的lock()
abstract void lock();
而sync.lock()是一个抽象方法,他是如何使用的呢,继续看NonfairSync和FairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
...
}
这两个内部类实现了lock(),在ReentrantLock中sync是要看它是哪个实例来进行调用
public final boolean isFair() {
return sync instanceof FairSync;
}
先介绍一下FairSync和NonfairSync,公平锁和非公平锁,区别在于,当有新的线程到达时是直接去尝试获取资源还是先排队,公平锁,由于是公平的,谁先来谁就先请求,所以线程到达后就进入等待队列,轮到自己之后就进行资源的获取,非公平锁,由于是不公平的,线程到达时先尝试获取,如果获取不到再进入等待队列等待。
以其中FairSync作为例子,lock()操作实际上是直接调用了acquire(1),是不是很熟悉,调用了AQS中的acquire方法,使用独占模式进行acquire。并重写了tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//资源区值为0,尝试获取
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//当前线程是不是已经获取了该锁,由于是可重入的,继续获取锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");//获取太多了!
setState(nextc);
return true;
}
return false;
}
CountDownLatch的实现原理及源码阅读
CountDownLatch使用场景是能控制同时运行的线程数不超过n条
在CountDownLatch中的tryAcquireShare和tryReleaseShared代码:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;//资源区count不为0,就尝试获取成功
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)//没有可释放的资源
return false;
int nextc = c-1;//用cas令资源区值 - 1
if (compareAndSetState(c, nextc))
return nextc == 0;//直到资源释放完毕
}
}
await和countDown:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
直接调用AQS已经实现了的入队出队阻塞唤醒的操作
CyclicBarrier的实现原理及源码阅读
CyclicBarrier能实现一个类似屏障,等达到指定个数的线程数后,线程同时启动。
实现原理是使用了ReentranLock锁,使用await()时,给该CyclicBarrier对象加锁,由于是可重入锁,同意CyclicBarrier对象反复调用await()知道达到指定count,令generation.broken=true,释放锁,如果还有线程发起await()请求,将会重新加锁等待到达count值后释放锁,如果不能到达count值,将会一直等下去。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Semaphore的实现原理及源码阅读
在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
有问题请指出,欢迎相互学习交流