1.特性分析
- 功能
一个用于线程间协作的工具类。用于线程间的数据交换。
- 实现机制
- 它提供了一个同步点,
在这个同步点,两个线程可以交换彼此的数据
。 - 两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,直到两个线程都到达同步点时,这两个线程就可以交换数据。
- 它提供了一个同步点,
- 应用场景
- 遗传算法
- 校对工作
- 管道设计
- 内存一致性影响
对于每一对通过Exchanger成功交换对象的线程来说,每一个线程中在exchange()方法前的行为happen-before
另一个线程中在exchange()方法返回后的行为。 - 核心算法
- 一个交换槽位
- 一个带有item的partner线程
- 使用同一个Exchanger的线程不止一个时,如何消除竞争?
- 通过安排一些线程使用
不同的槽位
来分散竞争压力 - 这样做最终依旧能保证两个匹配的线程可以交换item。
- 只有在检测到竞争时,我们才会分配多个槽位(比如单CPU时永远只有一个槽位即可)
- 在竞争中,不仅仅槽位应该在不同的位置,而且没有slot在相同的缓存行上,因此不会出现内存竞争。
- 开始时,只有一个槽位,我们通过跟踪冲突(exchange时失败的CAS次数)来扩展arena的大小。
- 通过安排一些线程使用
- arena等待
- 通过放弃等待的一段时间,减少arena的有效规模(如果此时槽位个数>1)
- “一段时间”的值应该定为多少,这是一个经验值。 我们利用spin->yield->block来实现一段合理的等待时间–在一个繁忙的exchanger中,资源获取后很快就会释放,在这种情况下,多处理器的上下文切换会非常慢,而且也造成了资源浪费.
- arena等待只是省略阻塞部分,而不是取消。根据经验,
自旋数被设定为:在一系列测试机器的最大持续交换率下,避免了99%的阻塞时间.
- 此类的核心方法2个
- slotExchange
- arenaExchange
- 这些方法的
宏观架构是类似的,但在组成的细节上有很多不同
。 - slotExchange方法使用了单一的Exchanger类型字段slot,而arena使用了一个数组.
- 通过线程的release操作来读取字段Node.item,并未将其声明为volatile类型de原因?
- 因为读取操作只会在CAS操作完成之后才发生
- 其它持有此字段的线程对其的使用都已经由其它操作确定了顺序
2.源码分析
package sourcecode.analysis;
/**
* @Author: cxh
* @CreateTime: 18/5/3 08:15
* @ProjectName: JavaBaseTest
*/
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 此类就是一个同步点,在此点处线程能够进行配对,并按配对交换元素.
* 每一个线程都代表了一些对象,这些对象要进入exchange方法,和partner线程匹配,获取partener的item并返回获取的新item.
* 一个Exchanger可以被看作是一个双向的同步队列形式.
* Exchangers在一些应用中很有用,比如遗传算法和管道设计中都很有用.
*
* 使用举例:
* 下面这个类的亮点是,它使用Exchanger在线程之间交换数据,以便填充缓冲区的线程在它需要缓冲区的时候能获得一个新的空缓冲区,
* 同时将填充好的缓冲区交付给清空缓存区的线程.
*
* class FillAndEmpty {
* Exchanger<DataBuffer> exchanger = new Exchanger<>();
* DataBuffer initialEmptyBuffer = ... a made-up type
* DataBuffer initialFullBuffer = ...
*
* class FillingLoop implements Runnable {
* public void run() {
* DataBuffer currentBuffer = initialEmptyBuffer;
* try {
* while (currentBuffer != null) {
* addToBuffer(currentBuffer);
* if (currentBuffer.isFull())
* currentBuffer = exchanger.exchange(currentBuffer);
* }
* } catch (InterruptedException ex) { ... handle ... }
* }
* }
*
* class EmptyingLoop implements Runnable {
* public void run() {
* DataBuffer currentBuffer = initialFullBuffer;
* try {
* while (currentBuffer != null) {
* takeFromBuffer(currentBuffer);
* if (currentBuffer.isEmpty())
* currentBuffer = exchanger.exchange(currentBuffer);
* }
* } catch (InterruptedException ex) { ... handle ...}
* }
* }
*
* void start() {
* new Thread(new FillingLoop()).start();
* new Thread(new EmptyingLoop()).start();
* }
* }}
*
* 内存一致性影响:
* 对于每一对通过Exchanger成功交换对象的线程来说,每一个线程中在exchange()方法前的行为
* happen-before
* 另一个线程中在exchange()方法返回后的行为.
*
* @since 1.5
* @author Doug Lea and Bill Scherer and Michael Scott
* @param <V> The type of objects that may be exchanged
*/
public class Exchanger<V> {
/*
* 概述:核心算法是,一个交换槽位和另一个带有item的partner线程.
* for (;;) {
* if (slot is empty) { //槽位为空,则放入item到节点 // offer
* place item in a Node;
* if (can CAS slot from empty to node) { //CAS操作将节点放入空槽位
* wait for release;//等待释放槽位
* return matching item in node;//返回匹配节点的item
* }
* }
* else if (can CAS slot from node to empty) { //槽位不为空,CAS操作将槽位节点移除 release
* get the item in node; //获取节点的item
* set matching item in node; //设定节点中匹配的内容
* release waiting thread; //释放等待线程
* }
* // CAS失败则继续循环操作
* }
*
* 这是“双重数据结构”的最简单形式之一
*
* 上述工作机制原则上可以工作的很好,但实际上,像许多在单个位置上进行原子更新的算法一样,当使用同一个Exchanger的线程不止一个时,
* 则存在严重的伸缩性问题.因此我们的实现采用了一种消除竞争的形式,它通过安排一些线程使用不同的槽位来分散竞争压力,这样做最终依旧能
* 保证两个匹配的线程可以交换item.
*
* 一个有效的竞争实现需要分配大量的空间(因为需要分配很多slot),因此只有在检测到竞争时,我们才会这么做(因为单cpu时,分配很多slot没有什么用,
* 所以也不会这么做).否则,exchanges就会使用单槽位的槽位交换方法.在竞争中,不仅仅槽位应该在不同的位置,而且没有slot在相同的缓存行上(更一般的讲,
* 就是相同的相干单元),因此不会出现内存竞争.因为在撰写本文时,无法确定缓存行大小,因此我们定义了一个对于普通平台来说都足够的值.
* 另外,在别处进行额外的保护以避免其他错误/非预期的共享,并增强局部性,包括对Node使用边距(通过sun.misc.Contended);嵌入“bound”作
* 为Exchange的字段;以及使用区别于LockSupport重排一些Park/unPark的机制。
*
* 开始时,只有一个槽位.我们通过跟踪冲突(exchange时失败的CAS)来扩展arena的大小;
* 根据上述算法的性质,仅有的几种类型的冲突已经明确暗示了:竞争是两个线程尝试释放Node的冲突--一个线程的offer发生CAS操作失败是合法的,
* 但是这不意味着2个及以上的线程同时发生CAS失败也是合理的(注意:在CAS操作失败后,通过读取槽位的值来检查冲突是可能的,但是这样做是不值得提倡
* 的).在当前arena限制内,如果一个线程在每一个槽位都发生了冲突,此时会扩展arena大小.通过使用bound字段的版本号,在一定范围内进行冲突的跟踪,
* 当线程发现界限值bound值已经被更改,则会保守的重置冲突个数.
*
* 通过放弃等待的一段时间,减少arena的有效规模(如果此时槽位个数>1).
* “一段时间”的值应该定为多少,这是一个经验问题。我们利用spin->yield->block来实现一段合理的等待时间--在一个繁忙的exchanger中,资源获取后
* 很快就会释放,在这种情况下,多处理器的上下文切换会非常慢,而且也造成了资源浪费.
* arena等待只是省略阻塞部分,而不是取消。根据经验,自旋数被设定为:在一系列测试机器的最大持续交换率下,避免了99%的阻塞时间.
* spin和yield都需要一些有限定的随机性(使用廉价的异或移位操作xorshift)以避免严格模式下会引起没必要的grow/shrink环。
* (使用伪随机还有助于通过使分支不可预知来调整旋转周期的持续时间。)当然,在offer的过程中,等待线程能够"知道"当槽位被改变时,其它线程将对此槽位
* 执行release操作,但是在匹配成功前,它依旧不能继续往下执行.同时,它也不能撤销offer操作,而只能是spin/yield操作.
* 注意:通过将线性化点更改为匹配字段的CAS(如在Scott&Scherer DISC论文中的一种情况中所做的),可以避免二次检查,这也会增加异步性,但代价是
* 冲突检测会比较差且无法总是重用每个线程的节点.因此此方式是一种折中方案.
*
* 发生冲突时,索引会按逆序循环遍历arena,当界限发生改变时,以最大索引(此位置Node最稀疏)重新开始.(过期后,索引减半直到为0为止)
* 通过使用随机数,素数步长或双重哈希式遍历,而不是简单的循环遍历来减少聚集是可能的(且已经做过尝试).
* 但是从经验来说,这些可能带来的好处无法克服其额外开销:除非存在持续的竞争,否则我们目前的管理操作运行都很快,所以更简单/更快的控制策略比
* 更准确但速度更慢的策略运作得更好。
*
* 因为我们使用过期来对arena的规模进行控制,因此在公有的exchange时间版本方法中不能抛出超时异常直到arena的规模大小缩为0(或者arena不能被
* 使用).这可能在超时上延长响应但是这种延迟是可以接受的.
*
* 基本上所有的实现都在方法slotExchange和arenaExchange中。
* 这些方法的宏观架构是类似的,但在组成的细节上有很多不同.slotExchange方法使用了单一的Exchanger类型字段slot,而arena使用了一个数组.
* 然而,它仍旧需要最少的冲突检测来触发arena的构建.(在这两个方法被调用时,最麻烦的部分就是确定中断状态以及转换期间正常出现的中断异常)
*
* 这种类型的代码中,这种方法太常见了,因为大多数逻辑都依赖于作为局部变量维护的字段来读取,所以不能很好的对方法进行分解--主要表现在这里:体积庞大
* 的spin-yield-block/cancel代码,以及严重依赖于内部函数(Unsafe)来使用内联嵌入式CAS和相关的内存访问操作(当它们被隐藏在命名友好且封装了
* 预期效果的方法后面时,动态编译器往往不会将其内联).
* 这包括使用putOrderedX来清除每个线程节点之间使用的字段.
* 请注意,即使通过线程的release操作来读取字段Node.item,也并未将其声明为volatile类型,因为读取操作只会在CAS操作完成之后才发生,并且
* 其它持有此字段的线程对其的使用都已经由其它操作确定了顺序.(因为实际的原子是对槽位的CAS操作,因此在release中对Node.match的写操作比完全
* volatile写要弱是合法的.然而,并没有这样做是因为它可以允许进一步推迟写,延迟进度。)
*/
//在arena中两个使用槽位之间的字节长度.ASHIFT*2至少应该是cache行的大小.
private static final int ASHIFT = 7;
/**
* 可以支持的arena的最大索引.最大可分配的arena规模是MMASK + 1.其值必须是2的整数次幂-1,且小于(1<<(31-ASHIFT)).
* 255(0xff)的上限足以满足主算法预期的缩放限制。
*/
private static final int MMASK = 0xff;
//绑定字段的序列/版本位的单位。每一次成功的绑定也会增加SEQ。
private static final int SEQ = MMASK + 1;
//cpu个数,用于控制规模扩展和自旋
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//arena中最大的槽位索引:槽位的数量原则上可以使得所有的线程不存在竞争,or可以使得最大索引个数的线程不出现竞争.
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
//等待匹配的过程中自旋的界限值.由于随机性,迭代的实际值平均来说一般是此值的两倍.
//注意:当NCPU=1时,自旋功能被禁用.
private static final int SPINS = 1 << 10;
/**
* 当参数为null或者public方法的返回值为null时,用此值替代.
* 因为API最初并不允许它的参数为null,所以需要设定这么一个值。
*/
private static final Object NULL_ITEM = new Object();
//内部的exchange方法在超时后返回Sentinel值,以避免为这些方法定义不同时间版本的方法。
private static final Object TIMED_OUT = new Object();
/**
* 节点持有交换的部分数据,加上其它每个线程的bookkeeping.
* 通过添加注解 @sun.misc.Contended是为了减少内存竞争.
*/
@sun.misc.Contended static final class Node {
int index; // Arena中的索引
int bound; // Exchanger.bound上一次的记录值
int collides; // 当前arena规模下,CAS失败的次数
int hash; // 用于自旋伪随机数
Object item; // 线程内存储的当前item
volatile Object match; // 由释放线程提供的item
volatile Thread parked; // 当线程阻塞时,将当前线程设置为此值;否则此值为null
}
//对应线程的本地类
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
//每一个线程的状态,作用是为每个线程保留唯一的一个Node节点
private final Participant participant;
//消去数组;在可以使用(槽位交换)前一直为null.
//元素访问使用模拟的volatile读和CAS.
private volatile Node[] arena;
//用于检测争用的槽位
private volatile Node slot;
/**
* 最大有效arena位置的索引,和SEQ数字的高位进行或运算,每次更新此值都会增加.
* 从0到SEQ初始化更新被用于:确保arena数组仅被创建一次.
*/
private volatile int bound;
/**
* 当arena可用后的Exchange函数.
* @param item 用于交换的item(非null)
* @param timed 如果等待时间是有限制的,则此值为true
* @param ns 如果等待时间有限制,则ns表示最长等待时间;否则其值为0L
* @return 另一个线程的item; 如果中断则返回null; 如果超时则返回null.
*/
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;//获取arena的值
Node p = participant.get();//取得当前线程本地的私有Node
for (int i = p.index;;) { // 根据索引i值访问槽位
int b, m, c; long j; // j是一个成熟的数组位移量
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
//如果取得的q不为空并且CAS操作成功,则交换数据,唤醒线程并返回数据
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
//否则假如当前下标i在范围之内(bound & MMASK)并且q为空,则尝试在i上占领node
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
//若成功占领,则采取与slotExchange中类似的自旋+阻塞方式
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
//
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
//当arena可用后的Exchange函数,参数意义同上
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();//返回此线程本地变量的副本,即私有Node
Thread t = Thread.currentThread();
//如果当前线程被中断,则返回null
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
//如果槽位不为null
if ((q = slot) != null) {
//cas操作获取槽位的内容并交换槽位内信息,成功则返回交换数据
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;//槽位内值被改为参数item
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
//cas失败,则创建arena用于竞争,在slot为null之前创建操作会一直进行
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
//如果arena不为null,进入arenaExchange的逻辑
else if (arena != null)
return null;
//否则,当前q(slot)为空,尝试占领,失败重试;成功之后跳出当前,进入spin+block模式
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
// await release
//(假若当前是限时版本)取得结束时间和自旋次数,进入自旋+阻塞逻辑:
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;//获取结束时间
int spins = (NCPU > 1) ? SPINS : 1; //获取自旋次数
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
//创建一个新的Exchanger
public Exchanger() {
participant = new Participant();
}
/**
* 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象移交给它, 同时获取另一个线程移交给它的对象.
*
* 如果另一个线程已经在交换点等待,则为了线程调度目它将被恢复,并接收当前线程传入的对象.
* 当前线程会立即返回,并获取由另一个线程传递到exchange的对象.
*
* <p>If no other thread is already waiting at the exchange then the
* current thread is disabled for thread scheduling purposes and lies
* dormant until one of two things happens:
* 如果当前没有线程在交换点等待,则当前线程被禁止用于线程调度且将一直处于休眠状态直到以下两个状况之一发生为止:
* 1. 其它线程进入了exchange
* 2.其它线程中断了当前线程
*
* 如果当前线程:
* 1.在进入此方法时设定了它的中断状态位;
* 2.在等待exchange时被中断
* @param x 交换的对象
*/
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
//和上述方法不一样地方:有等待exchange的超时限制
@SuppressWarnings("unchecked")
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
static {
int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> ek = Exchanger.class;
Class<?> nk = Node.class;
Class<?> ak = Node[].class;
Class<?> tk = Thread.class;
BOUND = U.objectFieldOffset
(ek.getDeclaredField("bound"));
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
MATCH = U.objectFieldOffset
(nk.getDeclaredField("match"));
BLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
throw new Error("Unsupported array scale");
}
}
3.测试用例
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static final Exchanger<String> exch=new Exchanger<>();//定义交换器
private static final ExecutorService threadPool= Executors.newFixedThreadPool(2);//定义线程池中服务线程个数
public static void main(String[] args) {
//定义银行流水A
Runnable r1=()->{
String stra="银行流水A";
try{
exch.exchange(stra);
}catch (InterruptedException e){
e.printStackTrace();
}
};
//定义银行流水B
Runnable r2=()->{
String strb="银行流水B";
try{
String recesive= exch.exchange(strb);
//判定B的原信息和获取信息是否相等
System.out.println("B的原item和从A获取的item是否相等:"+strb.equals(recesive));
System.out.println("原item为:"+strb);
System.out.println("从A获取的item为:"+recesive);
}catch (InterruptedException e){
e.printStackTrace();
}
};
threadPool.execute(r1);
threadPool.execute(r2);
threadPool.shutdown();
}
}
输出结果:
B的原item和从A获取的item是否相等:false
原item为:银行流水B
从A获取的item为:银行流水A
Process finished with exit code 0