无锁缓存框架Disruptor如何解决伪共享问题?

无锁的生产者–消费者

在上一篇使用BlockingQueue队列实现的生产者–消费者模式中,从BlockingQueue队列的源码可以看到两点值得注意,第一是为了保证线程安全,BlockingQueue使用了重入锁。第二,为了实现缓冲区满时,生产者等待,缓冲区空时,消费者等待,以及何时唤醒生产者/消费者的问题,使用了Condition来完成线程阻塞与唤醒。既然选择了使用锁来完成线程同步,那么在多个生产者和消费者情况下就有可能出现各种锁带来的问题,如线程等待锁而被阻塞,死锁等情况,都降低了程序整体的效率。

为了解决锁带来的性能损失,相信你会想到用无锁方式来替换BlockingQueue这个缓冲区队列,Disruptor框架就是一个使用CAS操作的内存队列,与普通的队列不同,Disruptor框架使用的是一个基于数组实现的环形队列,无论是生产者向缓冲区里提交任务,还是消费者从缓冲区里获取任务执行,都使用CAS操作。

为什么使用环形队列?

第一,简化了多线程同步的复杂度。学数据结构的时候,实现队列都要两个指针head和tail来分别指向队列的头和尾,对于一般的队列是这样,想象下,如果有多个生产者同时往缓冲区队列中提交任务,某一生产者提交新任务后,tail指针都要做修改的,那么多个生产者提交任务,头指针不会做修改,但会对tail指针产生冲突,例如某一生产者P1要做写入操作,在获得tail指针指向的对象值V后,执行compareAndSet()方法前,tail指针被另一生产者P2修改了,这时生产者P1执行compareAndSet()方法,发现tail指针指向的值V和期望值E不同,导致冲突。同样,如果多个消费者不断从缓冲区中获取任务,不会修改尾指针,但会造成队列头指针head的冲突问题(因为队列的FIFO特点,出列会从头指针出开始)。

环形队列的一个特点就是只有一个指针,只通过一个指针来实现出列和入列操作。如果使用两个指针head和tail来管理这个队列,有可能会出现“伪共享”问题(伪共享问题在下面我会详细说),因为创建队列时,head和tail指针变量常常在同一个缓存行中,多线程修改同一缓存行中的变量就容易出现伪共享问题。

第二,由于使用的是环形队列,那么队列创建时大小就被固定了,Disruptor框架中的环形队列本来也就是基于数组实现的,使用数组的话,减少了系统对内存空间管理的压力,因为它不像链表,Java会定期回收链表中一些不再引用的对象,而数组不会出现空间的新分配和回收问题。

数据在高速缓存中的存储方式

我们知道,CPU去访问外存数据是很慢的,所以为了提高CPU的处理速度,在CPU内都会有一个内存(高速缓存Cache)用来存放数据,由于CPU访问内存的速度非常快,对于一些例如循环自增自减的操作,把数据从外存读入到内存中来,每次循环都对内存中的数据做操作,而不用跑去访问速度较慢的外存,这样可以提高处理的速度。

在内存中数据读写的单位是“缓存行”,一行的大小为32到256个字节。假设一个缓存行32个字节,那么CPU一次就可以读到8个int型变量的数据,也就是说假设你只想访问某一个数组的变量如arr[ 4 ],CPU也会把相邻内存的数组变量例如arr[ 5 ]到arr[ 11 ]加入进高速缓存中来,这样做有一个好处就是,你可以非常快地访问arr[ 5 ]到arr[ 11 ]的变量数据。这样看起来非常好,我们访问自己需要的数据,同时也可以快速访问相邻的可能用得上的变量,不过这种方式在多线程中是会存在问题,例如上面的生产者–消费者问题。

在生产者–消费者模式中,假设我们使用的是普通队列做为内存缓冲区,队列中有head和tail两个指针,它们都在相邻地址内存中,也就是说,它们处在同一个缓存行(假设CPU的缓存行大小为64个字节)。假设出现下列情况:

  1. 生产者线程P1往缓冲区中做提交任务操作,先获得指针tail,此时在CPU中,内核1的缓存行内不仅读到了tail指针,同时也读到了head指针,虽然生产者不对head指针做修改。
  2. 消费者线程C1往缓冲区中做获取任务操作,内核2先获得指针head,同时也就获得了指针tail。
  3. 生产者完成提交任务操作,内核1中的缓存行内tail指针变量被修改,并且写入外存中,这会使得其他内核的带有变量tail的存储行失效,也就是消费者的存储行失效了。
  4. 虽然生产者只修改了自己存储行中的tail指针,但因为该存储行中带有head指针变量,使得消费者存储行失效,消费者不得不重新去外存读取head指针回内存。

反过来也是,假设消费者线程C1先修改了head指针,那么生产者线程中的缓存行同样就失效了(因为生产者线程的缓存行中也有head指针,即便生产者不是对它做修改),生产者必须重新去外存中读取tail指针,你可能会想,为什么不能只把生产者中的head指针标记为无效?而是生辰这线程中整个缓存行都无效?上面说了,缓存行是CPU读取数据的单位。可以看到,在多线程下,如果一个线程修改数据并写回外存中后,其余由“相关变量”的线程都不得不重新去外存读取需要的变量进来,而CPU访问外存速度是很慢的,这就无形中降低了整个程序的运行效率。

Disruptor如何解决伪共享问题?

伪共享问题,指在多线程(多核)环境下,如果一个变量共享了多个缓存行,或者说一个变量存在于多个CPU内核的缓存行中,那么在一个线程修改了这一变量后,其余共享这个变量的线程的缓存行都会失效。拿上面的例子来说,如果生产者访问tail指针,那么它同时也会得到head指针,它会把head指针也加载到自己的缓存行中,消费者也是,消费者要访问head指针,也常常会把tail指针加载到自己的缓存行中,假设生产者修改了tail指针,那么消费者的缓存行因为还有tail指针(即便消费者不会对它做修改),所以缓存行失效,消费者需要重新从外存中读取head指针进自己的高速缓存(Cache)中,反过来一样,这就是伪共享。

Disruptor框架是如何解决伪共享问题的?在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号,来看看Sequence类的源码:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	
	public Sequence() {
		this(INITIAL_VALUE);
	}
	
	public Sequence(final long initialValue) {
		UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
	}
}

从第1到11行可以看到,真正使用到的变量value,它的前后空间都由8个long型的变量填补了,对于一个大小为64字节的缓存行,它刚好被填补满(一个long型变量value,8个字节加上前/后个7long型变量填补,7*8=56,56+8=64字节)。这样做每次把变量value读进高速缓存中时,都能把缓存行填充满(对于大小为64个字节的缓存行来说,如果缓存行大小大于64个字节,那么还是会出现伪共享问题),保证每次处理数据时都不会与其他变量发生冲突。

      字节填充真的可以解决多线程(多核)伪共享问题吗?不妨写个最简单的例子试验以下:

package com.justinzeng.falsesharing;

public class FalseSharingDemo implements Runnable {
	//带包装的,有字节/无字节填充的Long数据类型数组
	private static LongDataPadding[] arr = new LongDataPadding[2];
	int index; 
	
	static {
		for(int i=0; i<2; i++) {
			arr[i] = new LongDataPadding();
		}
	}
	
	//构造函数初始化
	public FalseSharingDemo(int index) {
		this.index = index;
	}
	
	//有字节填充long数据类型
	public final static class LongDataPadding {
		public volatile long vaule = 0L;
		public long P1, P2, P3, P4, P5, P6, P7; //填充缓冲行
	}
	//无字节填充long数据类型
	public final static class LongDataNoPadding {
		public volatile long vaule = 0L;
	}
	
	@Override
	public void run() {
		for(long i=0; i<40000000L; i++) {
			arr[index].vaule = i;
		}
	}
	public static void main(String[] args) throws InterruptedException {
		Thread[] threads = new Thread[2];
		
		final long startTime = System.currentTimeMillis(); //记录开始时间
		for(int i=0; i<2; i++) {
			threads[i] = new Thread(new FalseSharingDemo(i));
		}
		for(Thread t : threads) {
			t.start();
		}
		for(Thread t : threads) {
			t.join();
		}
		System.out.println("consumeTime: "+(System.currentTimeMillis()-startTime)+"ms");
	}

}

首先在第20-27行包装这个long类型的数据value,一种是填充7个long类型变量的有字节填充方式LongDataPadding类,一种是无字节填充方式LongDataNoPadding类。我们要做的事就是定义两个线程,分别对一个大小为2的经过包装的long数据类型的数组做数据修改操作,如run()方法内,两个线程分别对同一数组内的不同位置做修改,按理说,虽然它们修改的数组位置不同(index不同),但由于它们修改的是同一个数组,有可能数组中两个位置的变量都被同时加载进缓存行中,从而出现多线程冲突情况,首先我们看看不使用字节填充的程序完成时间要多长:

不使用字节填充的话,程序消耗1028毫秒。如果使用有字节填充的方式,来看看完成时间:

只需要363毫秒,效率提高还是很大的。

      在代码中我们可能很容易就能发现多线程访问同一个变量,但我们很难发现某些多线程修改不同变量时,无意间影响了其他缓存行,而且这种情况会影响程序的整体性能。不过,在JDK8里给出了官方的解决伪共享的方法,就是使用注解@Contented,来告诉JVM这段数据应该放在不同的缓存行中。

完整实现代码已上传GitHub:

https://github.com/justinzengtm/Java-Multithreading/blob/master/ParallelMode/StimulateFalseSharing.java

发布了97 篇原创文章 · 获赞 71 · 访问量 7万+

猜你喜欢

转载自blog.csdn.net/justinzengTM/article/details/90606740