Concurrency(二十二: 非阻塞算法上)

非阻塞算法在并发上下文下是指一个算法允许线程访问共享状态(亦或是协作和沟通)时不会阻塞到其他相关线程.更通俗的讲,一个非阻塞算法是指在该算法中一个线程的停顿并不会引起其他相关线程的停顿.

为了更好的理解阻塞和非阻塞并发算法之间的区别,我们会先讲解阻塞算法再讲解非阻塞算法.

阻塞并发算法

一个阻塞的并发算法需要包含以下两个行为:

  • 执行来自线程的请求操作
  • 阻塞线程直到线程的请求操作能够被安全的执行

许多算法和并发数据结构都是阻塞的.例如,所有java.util.concurrent.BlockingQueue接口的实现类都是阻塞的数据结构.如果一个线程尝试插入元素到一个阻塞队列中并且发现队列已经没有剩余空间了,那么插入线程会被阻塞直到阻塞队列中有剩余空间可以插入元素为止.

以下示例图描述了一个阻塞算法保证共享数据结构安全访问的行为:

非阻塞并发算法

一个非阻塞并发算法需要包含以下两个行为:

  • 执行来自线程的请求操作
  • 通知请求线程它的请求操作不能被执行

Java中同时包含了一些非阻塞数据结构.像AtomicBoolean, AtomicInteger, AtomicLong 和 AtomicReference都是非阻塞数据结构活生生的例子.

以下示例图描述了一个非阻塞算法保证共享数据结构安全访问的行为:

非阻塞 vs 阻塞算法

非阻塞和阻塞算法之间的不同主要体现在上文提及算法需要包含两个行为中的第二个.换句话说,它们两的不同之处主要体现在当请求操作不能被执行时做出的响应.

阻塞算法会阻塞请求线程直到请求操作能够被执行为止.而非阻塞算法则是通知请求线程它的请求操作不能被执行.

在阻塞算法中,一个线程可能会被阻塞到它的请求操作能够被安全执行为止.通常其他线程请求操作的阻塞成就了第一个线程请求操作的安全执行.出于某些原因,如果应用中某些地方的其他线程发生停顿或阻塞,可能导致第一个线程的请求操作无法顺利的执行,那么第一个线程会陷入阻塞甚者是永久阻塞,直到有其他线程执行了必要的操作唤醒它为止.

例如,一个线程在尝试插入元素到一个已经满了的阻塞队列中时,会被阻塞到其他线程取走队列中的元素为止.如果出于某些原因,在应用中的某些地方负责取走队列元素的线程发生了停顿或阻塞,那么尝试插入元素到阻塞队列中的线程将会发生阻塞甚至是永久阻塞,直到最终有线程取走阻塞队列中的一个元素为止.

非阻塞并发数据结构

在多线程系统中,线程通常需要通过一些不同类型的数据结构来进行通讯.这些数据结构可以是简单的变量,也可以是像队列,map,栈等这样复杂的数据结构.为了确保正确性,多个线程并发访问数据结构时,需要通过一些并发算法来保障.由于这些并发算法的保障才让数据结构成为了并发数据结构.

如果一个算法是通过阻塞的方式来保障并发数据结构的,我们称为阻塞算法.那么这种数据结构我们称为阻塞的并发数据结构.

如果一个算法是通过非阻塞的方式来保障并发数据结构的,我们称为非阻塞算法.那么这种数据结构我们称为非阻塞的并发数据结构.

每种并发数据结构都是为特定的通讯场景设计的.至于需要使用哪种并发数据结构取决于你的通讯场景.我们在接下来的章节中会讲解几种非阻塞的并发数据结构.并且说明哪些情况下会用到这些数据结构.这些非阻塞并发数据结构工作原理的讲解能够给你一些思路怎么设计和实现一个非阻塞数据结构.

Volatile 变量

Java中的volatile变量能够让变量始终是从主存中加载的.只要volatile变量被赋予新值就会立即被写回到主存中去.这可以保证volatile变量最新的修改始终可以对运行在其他CPU上的线程可见.其他线程每次都会从主存中加载volatile变量而不是从它们运行CPU上的CPU缓存中.

volatile变量是非阻塞的.对volatile变量值的写入是一个原子操作.它不会被打断.然而,对一个volatile变量的读取更新写入一系列操作并不是原子的.也就是说,下面这段代码在多线程环境下仍然会出现竞态条件.

volatile myVar = 0;
...
int temp = myVar;
temp++;
myVar = temp;
复制代码

首先我们从主存中加载myVar变量然后赋予temp变量.然后对temp变量累加1.然后将变量temp重新赋予myVar,这意味着myVar变量会被立即写回到主存中去.

如果两个线程同时执行这段代码,同时加载变量myVar增加1并将变量值写回到主存中.那么存在一定的风险,本来对myVar变量的加法操作,现在只剩下一个了.(例如两个线程都会读取到变量值19,累加为20,再把20写回).

或许你觉得你不会写出像上面这样的代码,但在实操中上面的代码等同于:

myVar++;
复制代码

当你执行这段代码时,myVar变量值会被加载到CPU寄存器或CPU缓存中,进行一次加法操作,然后会将CPU寄存器或缓存中的值写回主存.

单个写线程的场景

某些场景下,你只有一个线程写入共享变量而有多个线程来读取变量.当只有一个线程更新变量时,无论有多少个线程同时读取变量都不会有竞态条件出现.所以只要只有一个写线程的情况下,你都可以使用volatile变量.

竟态条件只会在多个线程同时对一个共享变量做读取更新和写入一系列操作时才会发生.当你只有一个线程执行读取更新写入系列操作而有多个线程执行读取操作时,竟态条件不会发生.

这是一个只有一个写线程场景下的Counter实例,即使没有使用同步装置也不会有并发问题:

public class SingleWriterCounter {
    private volatile long count = 0;

    /**
     * 只能让一个相同的线程来调用该方法,
     * 否则将会有竟态条件出现
     */
    public void inc() {
        this.count++;
    }

    /**
     * 这个方法可以被多个读取线程调用
     * @return
     */
    public long count() {
        return this.count;
    }
}
复制代码

当只有一个线程调用inc()的情况下,多个线程可以安全的访问相同的Counter实例.当然相同的线程可以多次调用inc()方法,而不是只调用一次.多个线程可以同时调用count()方法而不会产生竟态条件.

下图描述了多个线程是如何访问volatile修饰的count变量的:

基于Volatile变量构建更加高级的数据结构

我们可以联合使用多个volatile变量来构建数据结构,每一个volatile变量都可以被一个线程写入和多个线程读取.每一个volatile变量可以被不同的线程写入(但只能是相同的线程).利用这种数据结构中的volatile变量可以让多个线程互相发送信息而不会发生阻塞.

这是一个可以让两个写线程操作的counter对象示例:

public class DoubleWriterCounter {
    private volatile long countA = 0;
    private volatile long countB = 0;

    /**
     * 只能让一个相同的写线程来调用该方法,
     * 否则会发生竟态条件
     */
    public void incA() { this.countA++;  }

    /**
     * 只能让一个相同的写线程来调用该方法,
     * 否则会发生竟态条件
     */
    public void incB() { this.countB++;  }

    /**
     * 多个读线程可以调用该方法
     */
    public long countA() { return this.countA; }

    /**
     * 多个读线程可以调用该方法 
     */
    public long countB() { return this.countB; }
}
复制代码

如你所见,DoubleWriterCounter有两个volatile变量和两对累加和读取方法.只能有一个相同的线程调用incA()和一个相同的线程调用incB().但可以由不同的线程分别调用incA()和inB()方法.多个线程可以同时调用countA()和countB()方法,而不会出现竟态条件.

DoubleWriterCounter可以用作两个线程之间互相通讯.两个count计数器可以用于执行生产任务和消费任务.下图描述了两个线程通过上述数据结构进行通讯的场景:

聪明的读者可以发现可以使用两个SingleWriterCounter实例来达到DoubleWriterCounter一样的效果.你甚至可以增加更多的SingleWriterCounter实例来实现更多线程之间互相通讯.

CAS乐观锁

如果你确实需要满足多个线程同时写入共享变量,那么仅仅是使用volatile已经不够用了.你需要特定类型的互斥访问.下面利用了Java中的synchronized同步块来使用互斥访问.

public class SynchronizedCounter {
    long count = 0;

    public void inc() {
        synchronized(this) {
            count++;
        }
    }

    public long count() {
        synchronized(this) {
            return this.count;
        }
    }
}
复制代码

我们可以注意到inc()和count()方法都被包裹在synchronized同步块中了.这就是我们需要解决的问题,即不调用synchronized同步块和wait()/notify()方法等也能使上文提及代码变成线程安全.

我们可以使用Java中的原子变量AtomicLong来替换两个synchronized同步块.下面给出的是AtomicLong版本的Counter对象:

public class AtomicCounter {
    private AtomicLong count = new AtomicLong(0);

    public void inc() {
        boolean updated = false;
        while(!updated){
            long prevCount = this.count.get();
            updated = this.count.compareAndSet(prevCount, prevCount + 1);
        }
    }

    public long count() {
        return this.count.get();
    }
}
复制代码

这个版本与之前的synchronized同步块版本一样也是线程安全的.这个版本有趣的地方是对inc()方法的更改.inc()方法中的代码不再包含在synchronized同步块中.而是更改为:

boolean updated = false;
while(!updated){
    long prevCount = this.count.get();
    updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
复制代码

这些代码并不全是原子操作.这意味仍然有可能被两个不同的线程调用,它们会同时执行long = prevCount = this.count.get();语句,同时会取得更改前Counter中的count变量值.即使这样这些代码仍然不会出现竟态条件.有趣吧!(笔者此刻的感受:当你对代码的底层知根知底时,即使是while(!updated)这样看似枯燥无味的代码也会变得十分有趣.)

秘密就在于while循环中的第二行代码.compareAndSet()调用是原子的.这段调用会比较AtomicLong中的值是不是预期值,如果符合预期则设置AtomicLong为新的值.这里的compareAndSet()方法直接使用CPU指令级的CAS.因此这里不需要任何同步限制也不需要阻塞线程.省去了阻塞线程所需要的性能开销.

想象一下AtomicLong此时内部值为20.现在同时有两个线程读取该值,并尝试调用compareAndSet(20, 20 + 1).由于compareAndSet()是原子操作的,同一时间只能有一个线程执行这个方法.

第一个执行的线程会先比较AtomicLong的内部值是否为20(执行更改前的值),当符合预期时,线程会将AtomicLong的内部值更改为21(20 + 1).如果更改变量成功,会将updated置换为true并停止while循环.

现在第二个线程可以调用compareAndSet(20, 20 + 1)了.当然现在AtomicLong的内部值已经不再是20了,此次调用将会失败.AtomicLong的值不会被设置为21.updated变量此时会被置换为false,线程会在while循环上自旋一次,重新进入循环内部.这一次,如果没有其他线程在调用compareAndSet()的话,它会读取到AtomicLong内部值为21,并重新调用compareAndSet(21, 21 + 1)将AtomicLong更新为22.

为什么称它为乐观锁?

前文中提到的代码实现我们称之为乐观锁.乐观锁跟传统的锁不太一样,我们通常称传统的锁为悲观锁.传统的方式是通过synchronized同步块和和各种类型的锁来锁住共享内存的访问.一个synchronized同步块或是锁会导致线程发生阻塞.

乐观锁允许所有线程创建共享内存的副本而不会发生阻塞.线程会对它们自己所持有的副本进行更改,并尝试将更改写回到共享内存.如果没有其他线程正在更改共享内存,那么cas允许线程将它的更改写回到共享内存中.如果已经有线程在更改共享内存,那么会读取一个新的拷贝,在新的拷贝上进行修改并重新尝试将修改写回到共享内存中.

我们称之为乐观锁的原因是线程会获取一份数据拷贝,并基于这份拷贝进行修改,基于乐观的假定,此时没有任何线程在修改共享内存.如果假定成真,那么线程只需要继续更改共享内存即可而不需要锁定任何东西.如果假定不成真,那么此次修改会被作废,但也不会锁定任何东西.

乐观锁,在对共享内存竞争率较低的情况下性能表现较好.如果对共享内存的竞争率比较高的话,线程会浪费大部分CPU运行时钟来做无效的数据拷贝修改和失败的共享内存写入.但是如果你的共享资源比较庞大的话,你需要考虑将你的代码重新设计为对共享内存竟争率较低的情况.

乐观锁是非阻塞的

上文示例的乐观锁是非阻塞的.如果一个线程出于未知的原因对共享内存数据进行拷贝和修改的过程中发生了阻塞将不会影响其他线程继续访问共享内存.

一个传统锁lock/unlock的情况.当一个线程取得锁实例时会阻塞其他线程直到它释放锁为止.如果一个线程在取得锁后执行临界区代码的过程中发生阻塞,那么会持有锁一段时间甚至是永远也不会释放.这样其他等待持有该锁的线程也会永远等待下去.

不可替换数据结构

一个简单的cas乐观锁能够在一次cas操作后将共享数据结构整个替换为新的.将整个数据结构替换为一个已经修改过的拷贝并不总是可行的.

想象一下如果共享数据结构是一个队列.每个线程都会拷贝一份它自己的副本,并尝试在副本上插入和取出元素以达到更改副本的效果.这里可以通过AtomicReference来达到目的.拷贝引用对象即拷贝和修改队列,并尝试将AtomicReference的引用指向新创建的队列.

然而,一个较大的数据结构需要花费更多的CPU运行时间和内存来进行拷贝.这会让你的应用消耗大量的内存和运行时间来进行拷贝.这可能会影响应用的执行,特别是对数据结构竞争比较激烈的情况.更多的,如果线程花费拷贝和修改数据结构的时间越多,那么就越有可能其他线程会在此期间已经对共享内存中的数据结构进行修改.如果线程拷贝的共享数据结构已经被修改过了,所有的线程都需要重新进行它们的拷贝和修改操作.这会对程序的执行性能和内存消耗造成更大的负面影响.

下一节中,将会介绍一个实现能够被并行修改的非阻塞数据结构的方法.

该系列博文为笔者复习基础所著译文或理解后的产物,复习原文来自Jakob Jenkov所著Java Concurrency and Multithreading Tutorial

猜你喜欢

转载自juejin.im/post/5cb2db5ce51d456e7c0cdadc