前言
Phaser
是JDK1.7提供的一种同步工具,是可重用的同步屏障,其功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的用法。本篇主要讲解Phaser
、Exchanger
两种工具的用法。
其更加灵活的地方就在于:可以阶段性的控制各个线程的行为。
举个例子
总共六位同学要参加同学聚会,聚会流程有三项:吃饭、K歌、看电影。六位同学都要吃饭,但是吃完饭后只有四位同学想K歌,K完歌后只有两位同学想看电影。
实现这样的场景,用CyclicBarrier和CountDownLatch显然不太方便。这时就可以使用Phaser
。
Phaser
定义流程
首先定义聚会流程,定义子类PartyPhaser
来实现Phaser
,并且重写onAdvance
方法
import java.util.concurrent.Phaser;
/**
* @author sicimike
*/
public class PartyPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
// 所有注册的线程都到达第一个屏障后,执行这个case
System.out.println("所有人均已吃完,共:" + registeredParties + " 人");
return false;
case 1:
// 所有注册的线程都到达第二个屏障后,执行这个case
System.out.println("所有人均已达到KTV,共:" + registeredParties + " 人");
return false;
case 2:
// 所有注册的线程都到达第三个屏障后,执行这个case
System.out.println("所有人均已到达电影院,共:" + registeredParties + " 人");
return true;
default:
return true;
}
}
}
onAdvance
有两个参数,int phase
表示流程编号(从0开始),int registeredParties
表示这个流程需要多少人参与。return false
表示还有后续流程。
实现流程
由于流程的各个步骤都是由六位同学(线程)来实现,所以现在实现这三个阶段的流程
import java.util.Random;
import java.util.concurrent.Phaser;
/**
* @author sicimike
*/
public class Person implements Runnable {
private final Random random = new Random();
private String name;
private Phaser phaser;
@Override
public void run() {
eat();
sing();
film();
}
// 吃饭
public void eat() {
sleep(random.nextInt(1000));
System.out.println(name + " 已经吃完");
// 到达某个阶段,等待其他线程到达
phaser.arriveAndAwaitAdvance();
}
// 唱歌
public void sing() {
if ("p0p1".contains(name)) {
// p0、p1不去唱歌
// 线程到达后,注销该线程(不参与接下来的流程)
phaser.arriveAndDeregister();
} else {
// 其余的人去唱歌
sleep(random.nextInt(1000));
System.out.println(name + " 已经到达KTV");
// 到达某个阶段,等待其他线程到达
phaser.arriveAndAwaitAdvance();
}
}
// 看电影
public void film() {
if ("p2p3".contains(name)) {
// p2、p3不去看电影
// 线程到达后,注销该线程(不参与接下来的流程)
phaser.arriveAndDeregister();
} else if (!"p0p1".contains(name)) {
// p4、p5去看电影
sleep(random.nextInt(1000));
System.out.println(name + " 已经到达电影院");
// 到达某个阶段,等待其他线程到达
phaser.arriveAndAwaitAdvance();
}
}
public static void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Person(String name, Phaser phaser) {
this.name = name;
this.phaser = phaser;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Phaser getPhaser() {
return phaser;
}
public void setPhaser(Phaser phaser) {
this.phaser = phaser;
}
}
main方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Phaser类的使用
*
* @author sicimike
* @see java.util.concurrent.Phaser
*/
public class PhaserDemo {
private final static int THREAD_COUNT = 6;
private final static ExecutorService service = Executors.newFixedThreadPool(THREAD_COUNT);
static PartyPhaser partyPhaser = new PartyPhaser();
public static void main(String[] args) {
// 批量注册
partyPhaser.bulkRegister(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
service.submit(new Person("p" + i, partyPhaser));
}
// 关闭线程池
service.shutdown();
}
}
执行结果
p1 已经吃完
p5 已经吃完
p3 已经吃完
p4 已经吃完
p0 已经吃完
p2 已经吃完
所有人均已吃完,共:6 人
p5 已经到达KTV
p4 已经到达KTV
p3 已经到达KTV
p2 已经到达KTV
所有人均已达到KTV,共:4 人
p4 已经到达电影院
p5 已经到达电影院
所有人均已到达电影院,共:2 人
执行结果可以看出,Phaser
很方便的实现了前文描述的场景。
Exchanger
相比之下Exchanger
的使用就相对简单了。
Exchanger
的作用是:交换两个线程的数据。
示例如下:
import java.util.concurrent.Exchanger;
/**
* @author sicimike
*/
public class ExchangeDemo {
public static void main(String[] args) {
final Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
String exchangerData = "thread-1 data";
try {
exchangerData = exchanger.exchange(exchangerData);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " : " + exchangerData);
}, "thread-1").start();
new Thread(() -> {
String exchangerData = "thread-2 data";
try {
exchangerData = exchanger.exchange(exchangerData);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " : " + exchangerData);
}, "thread-2").start();
}
}
执行结果:
thread-1 : thread-2 data
thread-2 : thread-1 data
可以看出,thread-1得到了thread-2的数据,而thread-2也得到了thread-1的数据。