这一篇我们通过两种实现自旋锁的方式来看一下不同的编程方式带来的程序性能的变化。
先理解一下什么是自旋,所谓自旋就是线程在不满足某种条件的情况下,一直循环做某个动作。所以对于自旋锁来锁,当线程在没有获取锁的情况下,一直循环尝试获取锁,直到真正获取锁。
在聊聊高并发(三)锁的一些基本概念 我们提到锁的本质就是等待,那么如何等待呢,有两种方式
1. 线程阻塞
2. 线程自旋
阻塞的缺点显而易见,线程一旦进入阻塞(Block),再被唤醒的代价比较高,性能较差。自旋的优点是线程还是Runnable的,只是在执行空代码。当然一直自旋也会白白消耗计算资源,所以常见的做法是先自旋一段时间,还没拿到锁就进入阻塞。JVM在处理synchrized实现时就是采用了这种折中的方案,并提供了调节自旋的参数。
这篇说一下两种最基本的自旋锁实现,并提供了一种优化的锁,后续会有更多的自旋锁的实现。
首先是TASLock (Test And Set Lock),测试-设置锁,它的特点是自旋时,每次尝试获取锁时,采用了CAS操作,不断的设置锁标志位,当锁标志位可用时,一个线程拿到锁,其他线程继续自旋。
缺点是CAS操作一直在修改共享变量的值,会引发缓存一致性流量风暴
-
package com.test.lock;
-
// 锁接口
-
public interface Lock {
-
public void lock();
-
-
public void unlock();
-
}
-
package com.test.lock;
-
import java.util.concurrent.atomic.AtomicBoolean;
-
/**
-
* 测试-设置自旋锁,使用AtomicBoolean原子变量保存状态
-
* 每次都使用getAndSet原子操作来判断锁状态并尝试获取锁
-
* 缺点是getAndSet底层使用CAS来实现,一直在修改共享变量的值,会引发缓存一致性流量风暴
-
* **/
-
public class TASLock implements Lock{
-
private AtomicBoolean mutex = new AtomicBoolean(false);
-
@Override
-
public void lock() {
-
// getAndSet方法会设置mutex变量为true,并返回mutex之前的值
-
// 当mutex之前是false时才返回,表示获取锁
-
// getAndSet方法是原子操作,mutex原子变量的改动对所有线程可见
-
while(mutex.getAndSet(true)){
-
}
-
}
-
@Override
-
public void unlock() {
-
mutex.set(false);
-
}
-
public String toString(){
-
return "TASLock";
-
}
-
}
一种改进的算法是TTASLock(Test Test And Set Lock)测试-测试-设置锁,特点是在自旋尝试获取锁时,分为两步,第一步通过读操作来获取锁状态,当锁可获取时,第二步再通过CAS操作来尝试获取锁,减少了CAS的操作次数。并且第一步的读操作是处理器直接读取自身高速缓存,不会产生缓存一致性流量,不占用总线资源。
缺点是在锁高争用的情况下,线程很难一次就获取锁,CAS的操作会大大增加。
-
package com.test.lock;
-
import java.util.concurrent.atomic.AtomicBoolean;
-
/**
-
* 测试-测试-设置自旋锁,使用AtomicBoolean原子变量保存状态
-
* 分为两步来获取锁
-
* 1. 先采用读变量自旋的方式尝试获取锁
-
* 2. 当有可能获取锁时,再使用getAndSet原子操作来尝试获取锁
-
* 优点是第一步使用读变量的方式来获取锁,在处理器内部高速缓存操作,不会产生缓存一致性流量
-
* 缺点是当锁争用激烈的时候,第一步一直获取不到锁,getAndSet底层使用CAS来实现,一直在修改共享变量的值,会引发缓存一致性流量风暴
-
* **/
-
public class TTASLock implements Lock{
-
private AtomicBoolean mutex = new AtomicBoolean(false);
-
@Override
-
public void lock() {
-
while(true){
-
// 第一步使用读操作,尝试获取锁,当mutex为false时退出循环,表示可以获取锁
-
while(mutex.get()){}
-
// 第二部使用getAndSet方法来尝试获取锁
-
if(!mutex.getAndSet(true)){
-
return;
-
}
-
}
-
}
-
@Override
-
public void unlock() {
-
mutex.set(false);
-
}
-
public String toString(){
-
return "TTASLock";
-
}
-
}
针对锁高争用的问题,可以采取回退算法,即当线程没有拿到锁时,就等待一段时间再去尝试获取锁,这样可以减少锁的争用,提高程序的性能。
-
package com.test.lock;
-
import java.util.Random;
-
/**
-
* 回退算法,降低锁争用的几率
-
* **/
-
public class Backoff {
-
private final int minDelay, maxDelay;
-
private int limit;
-
final Random random;
-
public Backoff(int min, int max){
-
this.minDelay = min;
-
this.maxDelay = max;
-
limit = minDelay;
-
random = new Random();
-
}
-
// 回退,线程等待一段时间
-
public void backoff() throws InterruptedException{
-
int delay = random.nextInt(limit);
-
limit = Math.min(maxDelay, 2 * limit);
-
Thread.sleep(delay);
-
}
-
}
-
package com.test.lock;
-
import java.util.concurrent.atomic.AtomicBoolean;
-
/**
-
* 回退自旋锁,在测试-测试-设置自旋锁的基础上增加了线程回退,降低锁的争用
-
* 优点是在锁高争用的情况下减少了锁的争用,提高了执行的性能
-
* 缺点是回退的时间难以控制,需要不断测试才能找到合适的值,而且依赖底层硬件的性能,扩展性差
-
* **/
-
public class BackoffLock implements Lock{
-
private final int MIN_DELAY, MAX_DELAY;
-
-
public BackoffLock(int min, int max){
-
MIN_DELAY = min;
-
MAX_DELAY = max;
-
}
-
-
private AtomicBoolean mutex = new AtomicBoolean(false);
-
-
@Override
-
public void lock() {
-
// 增加回退对象
-
Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
-
while(true){
-
// 第一步使用读操作,尝试获取锁,当mutex为false时退出循环,表示可以获取锁
-
while(mutex.get()){}
-
// 第二部使用getAndSet方法来尝试获取锁
-
if(!mutex.getAndSet(true)){
-
return;
-
}else{
-
//回退
-
try {
-
backoff.backoff();
-
} catch (InterruptedException e) {
-
}
-
}
-
-
}
-
}
-
@Override
-
public void unlock() {
-
mutex.set(false);
-
}
-
public String toString(){
-
return "TTASLock";
-
}
-
}
-
回退自旋锁的问题是回退的时间难以控制,需要不断测试才能找到合适的值,而且依赖底层硬件的性能,扩展性差。后面会有更好的自旋锁实现算法。
下面我们测试一下TASLock和TTASLock的性能。
首先写一个计时的类
-
package com.test.lock;
-
public class TimeCost implements Lock{
-
private final Lock lock;
-
public TimeCost(Lock lock){
-
this.lock = lock;
-
}
-
@Override
-
public void lock() {
-
long start = System.nanoTime();
-
lock.lock();
-
long duration = System.nanoTime() - start;
-
System.out.println(lock.toString() + " time cost is " + duration + " ns");
-
}
-
@Override
-
public void unlock() {
-
lock.unlock();
-
}
-
}
然后采用多个线程来模拟对同一把锁的争用
-
package com.test.lock;
-
public class Main {
-
private static TimeCost timeCost = new TimeCost(new TASLock());
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
public static void method(){
-
timeCost.lock();
-
//int a = 10;
-
timeCost.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 100; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
测试机器的性能如下:
CPU: 4 Intel(R) Core(TM) i3-2120 CPU @ 3.30GHz
内存: 8G
测试结果:
50个线程情况下:
TASLock平均获取锁的时间: 339715 ns
TTASLock平均获取锁的时间: 67106.2 ns
100个线程情况下:
TASLock平均获取锁的时间: 1198413 ns
TTASLock平均获取锁的时间: 1273588 ns
可以看到TTASLock的性能比TASLock的性能更好
对TTASLock的一种改进是BackoffLock,它会在锁高争用的情况下对线程进行回退,减少竞争,减少缓存一致性流量。但是BackoffLock有三个主要的问题:
1. 还是有大量的缓存一致性流量,因为所有线程在同一个共享变量上旋转,每一次成功的获取锁都会产生缓存一致性流量
2. 因为回退的存在,不能及时获取锁释放的信息,存在一个时间差,导致获取锁的时间变长
3. 不能保证无饥饿,有的线程可能一直无法获取锁
这篇会实现2种基于队列的锁,来解决上面提到的三个问题。主要的思路是将线程组织成一个队列,有4个优点:
1. 每个线程只需要检查它的前驱线程的状态,将自旋的变量从一个分散到多个,减少缓存一致性流量
2. 可以即使获取锁释放的通知
3. 队列提供了先来先服务的公平性
4. 无饥饿,队列中的每个线程都能保证被执行到
队列锁分为两类,一类是基于有界队列,一类是基于无界队列。
先看一下基于有界队列的队列锁。 ArrayLock有3个特点:
1. 基于一个volatile数组来组织线程
2. 通过一个原子变量tail来表示对尾线程
3. 通过一个ThreadLocal变量给每个线程一个索引号,表示它位于队列的哪个位置。
-
package com.test.lock;
-
import java.util.concurrent.atomic.AtomicInteger;
-
/**
-
* 有界队列锁,使用一个volatile数组来组织线程
-
* 缺点是得预先知道线程的规模n,所有线程获取同一个锁的次数不能超过n
-
* 假设L把锁,那么锁的空间复杂度为O(Ln)
-
* **/
-
public class ArrayLock implements Lock{
-
// 使用volatile数组来存放锁标志, flags[i] = true表示可以获得锁
-
private volatile boolean[] flags;
-
// 指向新加入的节点的后一个位置
-
private AtomicInteger tail;
-
// 总容量
-
private final int capacity;
-
private ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(){
-
protected Integer initialValue() {
-
return 0;
-
}
-
};
-
public ArrayLock(int capacity){
-
this.capacity = capacity;
-
flags = new boolean[capacity];
-
tail = new AtomicInteger(0);
-
// 默认第一个位置可获得锁
-
flags[0] = true;
-
}
-
@Override
-
public void lock() {
-
int slot = tail.getAndIncrement() % capacity;
-
mySlotIndex.set(slot);
-
// flags[slot] == true 表示获得了锁, volatile变量保证锁释放及时通知
-
while(!flags[slot]){
-
}
-
}
-
@Override
-
public void unlock() {
-
int slot = mySlotIndex.get();
-
flags[slot] = false;
-
flags[(slot + 1) % capacity] = true;
-
}
-
<pre name="code" class="java">
-
public String toString(){
-
return "ArrayLock";
-
}
}
我们可以看到有界队列锁的缺点是:
1. 它必须知道线程的规模数,对于同一把锁如果线程获取的次数超过了n会出现线程状态被覆盖的问题
2. 空间复杂度是O(Ln)
3. 对于共享的volatile数组来保存线程获取锁的状态,仍然可能存在缓存一致性。我们知道CPU读取一次内存时,会读满数据总线的位长,比如64位总线,一次读取64位长度的数据。那么对于boolean类型的数组,boolean长度是1个字节,那么一次读取能读到8个boolean变量,而高速缓存的一个缓存块的长度也是64位,也就是说一个缓存块上可以保存8个boolean变量,所以如果一次CAS操作修改了一个变量导致一个缓存块无效,它实际上可能导致8个变量失效。
解决办法是把变量以8个长度为单位分散,比如flag[0] = thread1 flag[8] = thread2。这样的问题是消耗的空间更大。
无界队列锁可以克服有界队列锁的几个问题。
1. 它使用链表来代替数组,实现无界队列
2. 使用两个ThreadLocal变量表示指针,一个指向自己的节点,一个指向前一个节点
3. 使用一个原子引用变量指向队尾
4. 空间复杂度降低,如果有L把锁,n个线程,每个线程只获取一把锁,那么空间复杂度为O(L + n)
5. 对同一个锁,一个线程可以多次获取而不增加空间复杂度
6. 当线程结束后,GC会自动回收内存
-
package com.test.lock;
-
import java.util.concurrent.atomic.AtomicReference;
-
/**
-
* 无界队列锁,使用一个链表来组织线程
-
* 假设L把锁,n个线程,那么锁的空间复杂度为O(L+n)
-
* **/
-
public class CLHLock implements Lock{
-
// 原子变量指向队尾
-
private AtomicReference<QNode> tail;
-
// 两个指针,一个指向自己的Node,一个指向前一个Node
-
ThreadLocal<QNode> myNode;
-
ThreadLocal<QNode> myPreNode;
-
public CLHLock(){
-
tail = new AtomicReference<QNode>(new QNode());
-
myNode = new ThreadLocal<QNode>(){
-
protected QNode initialValue(){
-
return new QNode();
-
}
-
};
-
myPreNode = new ThreadLocal<QNode>(){
-
protected QNode initialValue(){
-
return null;
-
}
-
};
-
}
-
@Override
-
public void lock() {
-
QNode node = myNode.get();
-
node.lock = true;
-
// CAS原子操作,保证原子性
-
QNode preNode = tail.getAndSet(node);
-
myPreNode.set(preNode);
-
// volatile变量,能保证锁释放及时通知
-
// 只对前一个节点的状态自旋,减少缓存一致性流量
-
while(preNode.lock){
-
}
-
}
-
@Override
-
public void unlock() {
-
QNode node = myNode.get();
-
node.lock = false;
-
// 把myNode指向preNode,目的是保证同一个线程下次还能使用这个锁,因为myNode原来指向的节点有它的后一个节点的preNode引用
-
// 防止这个线程下次lock时myNode.get获得原来的节点
-
myNode.set(myPreNode.get());
-
}
-
public static class QNode {
-
volatile boolean lock;
-
}
-
public String toString(){
-
return "CLHLock";
-
}
-
}
下面我们从正确性和平均获取锁的时间上来测试这两种锁。
我们设计一个测试用例来验证正确性: 使用50个线程对一个volatile变量++操作,由于volatile变量++操作不是原子的,在不加锁的情况下,可能同时有多个线程同时对voaltile变量++, 最终的结果是无法预测的。然后使用这两种锁,先获取锁再volatile变量++,由于volatile变量会防止重排序,并能保证可见性,我们可以确定如果锁是正确获取的,也就是说同一时刻只有一个线程对volatile变量++,那么结果肯定是顺序的1到50。
先看不加锁的情况
-
package com.test.lock;
-
public class Main {
-
//private static Lock lock = new ArrayLock(150);
-
private static Lock lock = new CLHLock();
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
private static volatile int value = 0;
-
public static void method(){
-
//lock.lock();
-
System.out.println("Value: " + ++value);
-
//lock.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 50; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
运行结果: 我们可以看到确实是发生的线程同时对volatile变量++的操作,结果是无法预料的
-
Value: 1
-
Value: 1
-
Value: 2
-
Value: 3
-
Value: 4
-
Value: 5
-
Value: 6
-
Value: 7
-
Value: 8
-
Value: 9
-
Value: 10
-
Value: 11
-
Value: 13
-
Value: 12
-
Value: 14
-
Value: 15
-
Value: 16
-
Value: 17
-
Value: 18
-
Value: 19
-
Value: 20
-
Value: 21
-
Value: 22
-
Value: 23
-
Value: 24
-
Value: 25
-
Value: 26
-
Value: 27
-
Value: 28
-
Value: 29
-
Value: 30
-
Value: 31
-
Value: 32
-
Value: 33
-
Value: 34
-
Value: 35
-
Value: 36
-
Value: 37
-
Value: 38
-
Value: 37
-
Value: 39
-
Value: 40
-
Value: 41
-
Value: 42
-
Value: 43
-
Value: 44
-
Value: 45
-
Value: 46
-
Value: 47
-
Value: 48
-
Value: 50
使用有界队列锁:
-
package com.test.lock;
-
public class Main {
-
private static Lock lock = new ArrayLock(100);
-
//private static Lock lock = new CLHLock();
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
private static volatile int value = 0;
-
public static void method(){
-
lock.lock();
-
System.out.println("Value: " + ++value);
-
lock.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 50; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
运行结果是1到50的顺序自增,说明锁保证了同一时刻只有一个线程在对volatile变量++,是正确的
-
Value: 1
-
Value: 2
-
Value: 3
-
Value: 4
-
Value: 5
-
Value: 6
-
Value: 7
-
Value: 8
-
Value: 9
-
Value: 10
-
Value: 11
-
Value: 12
-
Value: 13
-
Value: 14
-
Value: 15
-
Value: 16
-
Value: 17
-
Value: 18
-
Value: 19
-
Value: 20
-
Value: 21
-
Value: 22
-
Value: 23
-
Value: 24
-
Value: 25
-
Value: 26
-
Value: 27
-
Value: 28
-
Value: 29
-
Value: 30
-
Value: 31
-
Value: 32
-
Value: 33
-
Value: 34
-
Value: 35
-
Value: 36
-
Value: 37
-
Value: 38
-
Value: 39
-
Value: 40
-
Value: 41
-
Value: 42
-
Value: 43
-
Value: 44
-
Value: 45
-
Value: 46
-
Value: 47
-
Value: 48
-
Value: 49
-
Value: 50
使用无界队列锁的情况也是正确的,由于篇幅原因这里就不帖代码了。
再看平均获取锁的时间。
-
package com.test.lock;
-
public class Main {
-
private static Lock lock = new TimeCost(new CLHLock());
-
//private static Lock lock = new CLHLock();
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
private static volatile int value = 0;
-
public static void method(){
-
lock.lock();
-
//System.out.println("Value: " + ++value);
-
lock.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 100; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
在100个线程并发的情况下,
ArrayLock获取锁的平均时间是: 719550 ns
CLHLock获取锁的平均时间是: 488577 ns
可以看到,队列锁在使用多个共享变量自旋的情况下,减少了一致性流量,比TASLock和TTASLock 提高了程序的性能。而CLHLock比ArrayLock有更好的扩展性和性能,是一种很好的自旋锁实现。
CLHLock是无饥饿的,保证先来先服务公平性,只有少量的缓存一致性流量,在SMP系统结构中,是一种比较完善的锁。但是在没有cache的NUMA系统架构中,由于在前一个节点的lock状态上自旋,NUMA架构中处理器访问本地内存的速度高于通过网络访问其他节点的内存,所以CLHLock在NUMA架构上不是最优的自旋锁。
这篇介绍一种适合在无cache的NUMA系统架构中比较完善的队列锁MCSLock。它的特点是:
1. 使用1个ThreadLocal指针来做链表,由QNode自身维护下一个节点的指针
2. 线程在自身节点自旋,而不是CLHLock那样在前一个节点自旋
3. 在释放锁时需要判断是否是唯一节点,需要做一次CAS操作,如果不是唯一节点,要稍微等待链表关系的建立
-
package com.zc.lock;
-
import java.util.concurrent.atomic.AtomicReference;
-
/**
-
* 无界队列锁,使用一个链表来组织线程
-
* 假设L把锁,n个线程,那么锁的空间复杂度为O(L+n)
-
* **/
-
public class MCSLock implements Lock{
-
// 原子变量指向队尾
-
private AtomicReference<QNode> tail;
-
// 两个指针,一个指向自己的Node,一个指向前一个Node
-
ThreadLocal<QNode> myNode;
-
public MCSLock(){
-
tail = new AtomicReference<QNode>(null);
-
myNode = new ThreadLocal<QNode>(){
-
protected QNode initialValue(){
-
return new QNode();
-
}
-
};
-
}
-
@Override
-
public void lock() {
-
QNode node = myNode.get();
-
// CAS原子操作,保证原子性
-
QNode preNode = tail.getAndSet(node);
-
// 如果preNode等于空,证明是第一个获取锁的
-
if(preNode != null){
-
node.lock = true;
-
preNode.next = node;
-
// 对线程自己的node进行自旋,对无cache的NUMA系统架构来说,访问本地内存速度优于其他节点的内存
-
while(node.lock){
-
}
-
}
-
}
-
@Override
-
public void unlock() {
-
QNode node = myNode.get();
-
if(node.next == null){
-
// CAS操作,判断是否没有新加入的节点
-
if(tail.compareAndSet(node, null)){
-
// 没有新加入的节点,直接返回
-
return;
-
}
-
// 有新加入的节点,等待设置链关系
-
while(node.next == null){
-
}
-
}
-
// 通知下一个节点获取锁
-
node.next.lock = false;
-
// 设置next节点为空,为下次获取锁清理状态
-
node.next = null;
-
}
-
public static class QNode {
-
volatile boolean lock;
-
volatile QNode next;
-
}
-
public String toString(){
-
return "MCSLock";
-
}
-
}
下面采用和上一篇同样的测试用例来测试MCSLock的正确性
-
package com.zc.lock;
-
public class Main {
-
//private static Lock lock = new TimeCost(new ArrayLock(150));
-
private static Lock lock = new MCSLock();
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
private static volatile int value = 0;
-
public static void method(){
-
lock.lock();
-
System.out.println("Value: " + ++value);
-
lock.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 50; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
测试结果:顺序地打印出volatile变量++的结果,证明同一时刻只有一个线程在做volatile++操作,证明加锁成功。
-
Value: 1
-
Value: 2
-
Value: 3
-
Value: 4
-
Value: 5
-
Value: 6
-
Value: 7
-
Value: 8
-
Value: 9
-
Value: 10
-
Value: 11
-
Value: 12
-
Value: 13
-
Value: 14
-
Value: 15
-
Value: 16
-
Value: 17
-
Value: 18
-
Value: 19
-
Value: 20
-
Value: 21
-
Value: 22
-
Value: 23
-
Value: 24
-
Value: 25
-
Value: 26
-
Value: 27
-
Value: 28
-
Value: 29
-
Value: 30
-
Value: 31
-
Value: 32
-
Value: 33
-
Value: 34
-
Value: 35
-
Value: 36
-
Value: 37
-
Value: 38
-
Value: 39
-
Value: 40
-
Value: 41
-
Value: 42
-
Value: 43
-
Value: 44
-
Value: 45
-
Value: 46
-
Value: 47
-
Value: 48
-
Value: 49
-
Value: 50
java并发包中的Lock定义包含了时限锁的接口:
-
public interface Lock {
-
void lock();
-
void lockInterruptibly() throws InterruptedException;
-
boolean tryLock();
-
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
-
void unlock();
-
Condition newCondition();
-
}
tryLock就是实现锁的接口,它支持限时操作,支持中断操作。这两个特性很重要,可以防止死锁,也可以在死锁的情况下取消锁。
因为这两个特性的需要,队列锁的节点需要支持“退出队列”的机制,也就是说当发生超时或者线程中断的情况下,线程能从队列中出队列,不影响其他节点继续等待。之前实现的几种队列锁都不支持退出机制,一旦发生队列中的线程长时间阻塞,那么后续所有的线程都会被动阻塞。
我们看一种限时队列锁的实现,它有几个要点:
1. 定义一个共享的AVAILABLE节点,当一个节点的preNode指向AVAILABLE时,表示这个节点获得锁
2. QNode节点维护一个preNode引用,这个引用只有当获得锁时,会指向AVAILABLE,或者超时了会指向它的前一个节点,其他等待锁的时候都是Null,因为一旦一个节点超时了,需要让它的后续节点指向它的前驱节点,所以只有超时的时候会给preNode设置值(指向AVAILABLE节点除外)。
3. 使用一个AtomicReference原子变量tail来形成一个虚拟的单向链表结构。tail的getAndSet操作会返回之前的节点的引用,相当于获得了前驱节点。当获得锁后,前驱节点引用就释放了,前驱节点就可以被GC回收
4. 支持中断操作,Thread.isInterrupted()可以获得线程中断的信息,一旦获取中断信息,就抛出中断异常。需要注意的时,线程中断信息发出时,并不是要求线程马上中断,而是告知了线程要中断的信息,程序自己控制中断的地点。
5. 由于线程只有一个ThreadLocal的myNode变量指向自己的节点,所以获取锁时,使用了每次new一个新的Node,并设置给线程的方式,避免unlock时对node的操作影响后续节点的状态,也可以使线程多次获得锁。这里可以考虑像CLHLock那样,维护两个ThreadLocal的引用,释放锁时把myNode的引用指向已经不使用的前驱节点,这样避免无谓的new操作。
-
package com.zc.lock;
-
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicReference;
-
/**
-
* 时限队列锁,支持tryLock超时操作
-
* QNode维护一个指针preNode指向前一个节点。当preNode == AVAILABLE表示已经释放锁。当preNode == null表示等待锁
-
* tail维护一个虚拟链表,通过tail.getAndSet方法获得前一个节点,并在前一个节点自旋,当释放锁时前一个节点的preNode == AVAIABLE,自动通知后一个节点获取锁
-
* 当一个节点超时或者被中断,那么它的前驱节点不为空。后续节点看到它的前驱节点不为空,并且不是AVAILABLE时,知道这个节点退出了,就会跳过它
-
* 当节点获得锁,进入临界区后,它的前驱节点可以被回收
-
* **/
-
public class TimeoutLock implements TryLock{
-
// 声明为静态变量,防止被临时回收
-
private static final QNode AVAILABLE = new QNode();
-
// 原子变量指向队尾
-
private AtomicReference<QNode> tail;
-
ThreadLocal<QNode> myNode;
-
public TimeoutLock(){
-
tail = new AtomicReference<QNode>(null);
-
myNode = new ThreadLocal<QNode>(){
-
protected QNode initialValue(){
-
return new QNode();
-
}
-
};
-
}
-
@Override
-
public void lock() {
-
// 和CLHLock不同,每次新建一个Node,并设置给线程,目的是支持同一个线程可以多次获得锁,而不影响链中其他节点的状态
-
// CLHLock不需要每次新建Node是因为它使用了两个指针,一个指向前驱节点。而前驱节点释放后就可以回收了。
-
// CLHLock每次释放锁时设置myNode为失效的前驱节点,也是为了支持同一个线程可以多次获取锁而不影响其他节点
-
QNode node = new QNode();
-
myNode.set(node);
-
QNode pre = tail.getAndSet(node);
-
if(pre != null){
-
// 在前一个节点自旋,当前一个节点是AVAILABLE时,表示它获得锁
-
while(pre.preNode != AVAILABLE){
-
}
-
}
-
}
-
@Override
-
public void unlock() {
-
QNode node = myNode.get();
-
// CAS操作,如果为true,表示是唯一节点,直接释放就行;否则把preNode指向AVAILABLE
-
if(!tail.compareAndSet(node, null)){
-
node.preNode = AVAILABLE;
-
}
-
}
-
@Override
-
//TimeUnit只支持毫秒
-
public boolean trylock(long time, TimeUnit unit) throws InterruptedException {
-
if(Thread.interrupted()){
-
throw new InterruptedException();
-
}
-
boolean isInterrupted = false;
-
long startTime = System.currentTimeMillis();
-
long duration = TimeUnit.MILLISECONDS.convert(time, unit);
-
// 注意:每次tryLock都要new新的Node,为了同一个线程可以多次获得锁。如果每个线程都使用同一个节点,会影响链中其他的节点
-
QNode node = new QNode();
-
myNode.set(node);
-
// 尝试一次获取锁
-
QNode pre = tail.getAndSet(node);
-
// 第一个节点或者之前的节点都是已经释放了锁的节点, pre==AVAILABLE表示获得了锁
-
if(pre == null || pre == AVAILABLE){
-
return true;
-
}
-
// 在给定时间内对preNode自旋
-
while((System.currentTimeMillis() - startTime < duration) && !isInterrupted){
-
QNode predPreNode = pre.preNode;
-
// 表示前一个节点已经释放了锁,设置了preNode域,否则preNode域为空
-
if(predPreNode == AVAILABLE){
-
return true;
-
}
-
// 当prePreNode != null时,只有两种情况,就是它超时了,或者被中断了。
-
// 跳过prePreNode不为空的节点,继续自旋它的下一个节点
-
else if(predPreNode != null){
-
pre = predPreNode;
-
}
-
if(Thread.interrupted()){
-
isInterrupted = true;
-
}
-
}
-
// 超时或者interrupted,都要设置node的前驱节点不为空
-
node.preNode = pre;
-
if(isInterrupted){
-
throw new InterruptedException();
-
}
-
return false;
-
}
-
public static class QNode {
-
volatile QNode preNode;
-
}
-
public String toString(){
-
return "TimeoutLock";
-
}
-
}
TimeoutLock具备所有CLHLock的特性,比如无饥饿,先来先服务的公平性,在多个共享变量上自旋,从而控制合理的缓存一致性流量等等,并且支持了限时操作和中断操作。
使用限时锁时有固定的模板,防止锁被错误使用。
-
Lock lock = ...;
-
if (lock.tryLock()) {
-
try {
-
// manipulate protected state
-
} finally {
-
lock.unlock();
-
}
-
} else {
-
// perform alternative actions
-
}
这篇说说限时有界队列锁,它采用了有界队列,并且和ArrayLock不同,它不限制线程的个数。它的特点主要有
1. 采用有界队列,减小了空间复杂度,L把锁的空间复杂度在最坏的情况下(有界队列长度为1)是O(L)
2. 非公平,不保证先来先服务,这也是一个很常见的需求
3. 因为是有界队列,所以在高并发下存在高争用,需要结合回退锁来降低争用
它的实现思路是:
1. 采用了一个有界的等待队列,等待队列的每个节点都有多种状态,每个节点是可复用的
2. 采用了一个工作队列,Tail指针指向工作队列的队尾节点。获取和是否锁的操作是在工作队列中的节点之间进行
3. 由于是限时队列,并支持中断,所以队列中的节点都是可以退出队列的
4. 算法分为三步,第一步是线程从有界的等待队列中获得一个节点,并设置为WAITING,如果没有获得,就自旋
第二步是把这个节点加入工作队列,并获得前一个节点的指针
第三步是在前一个节点的状态上自旋,直到获得锁,并把前一个节点RELEASED状态改为FREE
节点有4种状态:
1. FREE: 表示节点可以被获得。当前一个节点释放锁,并设置状态为RELEASED的时候,后一个节点需要把前一个节点设置为FREE。当节点在没有进入工作队列时超时,也被设置为FREE.
2. RELEASED:节点释放锁时设置为RELEASED,需要后续节点把它设置为FREE。如果是工作队列的最后一个节点,那么RELEASED状态的节点在第一步时可被获得
3. WAITING:表示获得了锁或在工作队列中等待锁。是在第一步中被设置的,第一步的结果就是获得一个状态为WAITING的节点
4. ABORTED:工作队列中的节点超时或者中断的节点被设置为ABORTED。 队尾的ABORTED节点可以被第一步获得,队中的ABORTED节点不能被第一步获取,只能把它的preNode指针指向它的前一个节点,表示它自己不能被获取了
理解节点这4种状态的转变是理解这个设计的关键。这个设计比较复杂,从篇幅考虑,这篇只介绍Lock和UnLock操作,下一篇说tryLock限时操作
1. 创建枚举类型State来表示状态
2. 创建QNode表示节点,使用一个AtomicReference原子变量指向它的State,以便于支持CAS操作。节点维护一个PreNode引用,只有节点被Aborted的时候才设置这个引用的值,表示跳过这个节点
3. 一个有界的QNode队列,使用数组表示
4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,单位是毫秒。这两个值依赖于硬件性能,需要通过不断测试来获取最优值
5. 一个Random随机数,来产生随即的数组下标,非公平性需要
6. 一个AtomicStampedReference类型的原子变量作为队尾指针tail。AtomicStampedReference采用了版本号来避免CAS操作的ABA问题。这很重要,因为有界等待队列的节点会多次进出工作队列,所以可能发生同一个节点被前一个线程准备CAS操作时,已经被后几个线程进出了工作队列,导致第一个线程拿到的QNode的状态不正确。
7. lock实现分为三步,上文已经说过了
8. unlock操作就是两步,第一修改状态通知其他线程获取锁。第二是设置自己的节点引用,以便下次可再次获得锁而不影响其他线程的状态。这里是把线程指向的节点状态设置为RELEASED,同时设置线程的节点引用为空,这样其他线程可以继续使用这个节点。
-
package com.zc.lock;
-
import java.util.Random;
-
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicReference;
-
import java.util.concurrent.atomic.AtomicStampedReference;
-
/**
-
* 限时有界队列锁,并且直接不限数量的线程
-
* 由于是有界的队列,所以争用激烈,可以复合回退锁的概念,减少高争用
-
* 分为三步:
-
* 第一步是取得一个State为FREE的节点,设置为WAITING
-
* 第二步是把这个节点加入队列,获取前一个节点
-
* 第三步是在前一个节点上自旋
-
*
-
* 优点是L个锁的空间复杂度是O(L),而限时无界队列锁的空间复杂度为O(Ln)
-
* **/
-
public class CompositeLock implements TryLock{
-
enum State {FREE, WAITING, RELEASED, ABORTED}
-
class QNode{
-
AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
-
volatile QNode preNode;
-
}
-
private final int SIZE = 10;
-
private final int MIN_BACKOFF = 1;
-
private final int MAX_BACKOFF = 10;
-
private Random random = new Random();
-
// 有界的QNode数组,表示队列总共可以使用的节点数
-
private QNode[] waitings = new QNode[10];
-
// 指向队尾节点,使用AtomicStampedReference带版本号的原子引用变量,可以防止ABA问题,因为这个算法实现需要对同一个Node多次进出队列
-
private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);
-
// 每个线程维护一个QNode引用
-
private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
-
public QNode initialValue(){
-
return null;
-
}
-
};
-
public CompositeLock(){
-
for(int i = 0; i < SIZE; i ++){
-
waitings[i] = new QNode();
-
}
-
}
-
@Override
-
public void lock() {
-
Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
-
QNode node = waitings[random.nextInt(SIZE)];
-
// 第一步: 先获得数组里的一个Node,并把它的状态设置为WAITING,否则就自旋
-
GETNODE:
-
while(true){
-
while(node.state.get() != State.FREE){
-
// 因为释放锁时只是设置了State为RELEASED,由后继的线程来设置RELEASED为FREE
-
// 如果该节点已经是队尾节点了并且是RELEASED,那么可以直接可以被使用
-
// 获取当前原子引用变量的版本号
-
int[] currentStamp = new int[1];
-
QNode tailNode = tail.get(currentStamp);
-
if(tailNode == node && tailNode.state.get() == State.RELEASED){
-
if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
-
node.state.set(State.WAITING);
-
break GETNODE;
-
}
-
}
-
}
-
if(node.state.compareAndSet(State.FREE, State.WAITING)){
-
break;
-
}
-
try {
-
backoff.backoff();
-
} catch (InterruptedException e) {
-
throw new RuntimeException("Thread interrupted, stop to get the lock");
-
}
-
}
-
// 第二步加入队列
-
int[] currentStamp = new int[1];
-
QNode preTailNode = null;
-
do{
-
preTailNode = tail.get(currentStamp);
-
}
-
// 如果没加入队列,就一直自旋
-
while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));
-
// 第三步在前一个节点自旋,如果前一个节点为null,证明是第一个加入队列的节点
-
if(preTailNode != null){
-
// 在前一个节点的状态自旋
-
while(preTailNode.state.get() != State.RELEASED){}
-
// 设置前一个节点的状态为FREE,可以被其他线程使用
-
preTailNode.state.set(State.FREE);
-
}
-
// 将线程的myNode指向获得锁的node
-
myNode.set(node);
-
return;
-
}
-
@Override
-
public void unlock() {
-
QNode node = myNode.get();
-
node.state.set(State.RELEASED);
-
myNode.set(null);
-
}
-
@Override
-
public boolean trylock(long time, TimeUnit unit)
-
throws InterruptedException {
-
// TODO Auto-generated method stub
-
return false;
-
}
-
}
采用我们之前的验证锁正确性的测试用例来测试lock, unlock操作。
-
package com.zc.lock;
-
public class Main {
-
//private static Lock lock = new TimeCost(new ArrayLock(150));
-
private static Lock lock = new CompositeLock();
-
//private static TimeCost timeCost = new TimeCost(new TTASLock());
-
private static volatile int value = 0;
-
public static void method(){
-
lock.lock();
-
System.out.println("Value: " + ++value);
-
lock.unlock();
-
}
-
public static void main(String[] args) {
-
for(int i = 0; i < 50; i ++){
-
Thread t = new Thread(new Runnable(){
-
@Override
-
public void run() {
-
method();
-
}
-
});
-
t.start();
-
}
-
}
-
}
结果是顺序打印的,证明锁是正确的,每次只有一个线程获得了锁
-
Value: 1
-
Value: 2
-
Value: 3
-
Value: 4
-
Value: 5
-
Value: 6
-
Value: 7
-
Value: 8
-
Value: 9
-
Value: 10
-
Value: 11
-
Value: 12
-
Value: 13
-
Value: 14
-
Value: 15
-
Value: 16
-
Value: 17
-
Value: 18
-
Value: 19
-
Value: 20
-
Value: 21
-
Value: 22
-
Value: 23
-
Value: 24
-
Value: 25
-
Value: 26
-
Value: 27
-
Value: 28
-
Value: 29
-
Value: 30
-
Value: 31
-
Value: 32
-
Value: 33
-
Value: 34
-
Value: 35
-
Value: 36
-
Value: 37
-
Value: 38
-
Value: 39
-
Value: 40
-
Value: 41
-
Value: 42
-
Value: 43
-
Value: 44
-
Value: 45
-
Value: 46
-
Value: 47
-
Value: 48
-
Value: 49