文章目录
系列文章
JUC并发工具包之Phaser Part1 结构与状态
JUC并发工具包之Phaser Part2 源码分析
JUC并发工具包之Phaser Part3 场景使用
JUC并发工具包之Semaphore
JUC并发工具包之CyclicBarrier
JUC并发工具包之CountDownLatch
JUC并发工具包之CyclicBarrier & CountDownLatch的异同
概览
本文我们将看一看Java并发包中的Phaser,正如JDK中描述那样,它和CyclicBarrier、CountDownLatch提供的功能很类似,但是多了一些使用场景。
A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务。
这种场景虽然使用CyclicBarrier或者CountDownLatch也可以实现,但是要复杂的多。首先,具体需要多少个阶段是可能会变的,其次,每个阶段的任务数也可能会变的。相比于CyclicBarrier和CountDownLatch,Phaser更加灵活更加方便。
模拟场景
想了好久想了这么一个场景,周末起床先打扫5个地方、再炒4个菜、再喊老婆孩子2个人吃饭,最后一个人洗碗。
当然这些事情我都不想做,哈哈哈,我想象出来有多重影分身来帮我做事,哪怕叫老婆孩子,也得有两个人帮我喊,我只负责编排这些事情,来,影分身们,别怕累,动起来!
我们创建一个实现了Runnable接口的HouseholdJob类:
- 传入name(家务活)和phaser(当前Phaser实例)
- 构造方法中注册当前线程到phaser中
- run方法里先调用
phaser.arriveAndAwaitAdvance()
阻塞,再睡眠一会模拟做家务,最后打印一段文字说明家务已经干完了,最后移除自己的注册。
private static class HouseholdJob implements Runnable {
private final Phaser phaser;
private final String name;
private HouseholdJob(Phaser phaser, String name) {
this.name = name;
this.phaser = phaser;
phaser.register();
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
log.info("开始{}...", name);
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("完成{}.", name);
// 大家都完成后自己取消登记(影分身消失)
phaser.arriveAndDeregister();
}
}
当我们的类初始化之后,当前线程就算是注册到Phaser实例中去了,这将会增加使用当前Phaser的parties数量。
调用arriveAndAwaitAdvance()
方法会使当前线程在代码处阻塞,当arrived parties和当前注册的parties一样多的时候,阻塞就会被打开,程序会接着往下走。
当所有的处理都完成后,当前线程调用arriveAndDeregister()
方法撤销自己在Phaser上的注册。
定义不同的阶段(每个阶段运行的任务不同)
预先定义线程池,设置5个线程
private static final ExecutorService es = Executors.newFixedThreadPool(5)
Phaser1
- 开始提交5个打扫任务
- 提交完先看一下中间状态
- 调用
phaser.arriveAndAwaitAdvance()
方法阻塞当前线程执行到所有phaser1的任务都处理完。 - 最后看下phaser中的状态(parties值、unarrived值、phase值)
private static void executePhaser1(Phaser phaser) {
es.submit(new HouseholdJob(phaser, "打扫主卧"));
es.submit(new HouseholdJob(phaser, "打扫次卧"));
es.submit(new HouseholdJob(phaser, "打扫客厅"));
es.submit(new HouseholdJob(phaser, "打扫卫生间"));
es.submit(new HouseholdJob(phaser, "打扫阳台"));
peekPhaserState(phaser, "召唤5个影分身打扫5个房间,都给我动起来!查看中间状态");
phaser.arriveAndAwaitAdvance();
peekPhaserState(phaser, "打扫完成了,满意!换批影分身来弄个3菜1汤吧");
}
Phaser2
- 提交4个做菜任务
- 调用
phaser.arriveAndAwaitAdvance()
方法阻塞当前线程执行到所有phaser1的任务都处理完。 - 最后看下phaser中的状态(parties值、unarrived值、phase值)
private static void executePhaser2(Phaser phaser) {
es.submit(new HouseholdJob(phaser, "炒青菜"));
es.submit(new HouseholdJob(phaser, "红烧肉"));
es.submit(new HouseholdJob(phaser, "油焖大虾"));
es.submit(new HouseholdJob(phaser, "番茄鸡蛋汤"));
phaser.arriveAndAwaitAdvance();
peekPhaserState(phaser, "菜炒好了,看起来还不错,要喊老婆孩子出来吃饭了");
}
Phaser3
与上面Phaser类似
private static void executePhaser3(Phaser phaser) {
es.submit(new HouseholdJob(phaser, "喊老婆出来吃饭"));
es.submit(new HouseholdJob(phaser, "喊孩子出来吃饭"));
phaser.arriveAndAwaitAdvance();
peekPhaserState(phaser, "看看都吃完了没有,等会还得洗碗,我就是个工具人......");
}
Phaser4
与上面Phaser类似
private static void executePhaser4(Phaser phaser) {
es.submit(new HouseholdJob(phaser, "洗碗"));
phaser.arriveAndAwaitAdvance();
peekPhaserState(phaser, "来看看洗的干不干净。");
}
组织4个阶段
- 定义一个Phaser,默认传parties为1,表示当前线程的注册
- 重写
onAdvance()
方法,在phase值自增的时候打印日志信息。 - 先查看一下刚定义完的phaser状态
- 依次执行四个阶段任务(每个阶段的任务数不一样,动态的parties数不同)
- 执行完四个阶段任务后,移除当前线程的注册
- 终止Phaser实例并打印终止的状态
public static void main(String[] args) throws NoSuchFieldException, InterruptedException {
// 传入parties为1,表示注册当前线程,当前线程可作为协调线程参与工作。
Phaser phaser = new Phaser(1) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
long state = getStateValue(this, stateField);
log.info("\033[33m第{}件事就绪,current phase = {}, registeredParties = {}.\033[0m{}",
phase + 1, phase, registeredParties, getFormattedPhaserStateOutput(
state, "parties都arrived,任务就绪待执行,phase会在onAdvance方法调用完自增。"));
return super.onAdvance(phase, registeredParties);
}
};
log.info("周末起床了,开始家务了......");
peekPhaserState(phaser, "刚创建好一个Phaser,默认parties为1,表示我carry全场");
executePhaser1(phaser);
executePhaser2(phaser);
executePhaser3(phaser);
executePhaser4(phaser);
phaser.arriveAndDeregister();
// 完成所有工作了,老夫要休息了
phaser.forceTermination();
peekPhaserState(phaser, "哎呀,肚子叫了,居然饿醒了,看来最近火影看太多了啊~");
}
结果输出
17:18:33.400 [main] INFO PhaserStateAnalysis - 周末起床了,开始家务了......
17:18:33.460 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 刚创建好一个Phaser,默认parties为1,表示我carry全场
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0000,0000 0000 0000 0001,0000 0000 0000 0001 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000000 | phaseOf : 0 *
* >>> parties : 0000000000000001 | partiesOf : 1 *
* >>> unarrived : 0000000000000001 | unarrivedOf: 1 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:33.464 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 召唤5个影分身打扫5个房间,都给我动起来!查看中间状态
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0000,0000 0000 0000 0110,0000 0000 0000 0010 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000000 | phaseOf : 0 *
* >>> parties : 0000000000000110 | partiesOf : 6 *
* >>> unarrived : 0000000000000010 | unarrivedOf: 2 *
* >>> | arrivedOf : 4 *
****************************************************************************************************
17:18:33.464 [main] INFO PhaserStateAnalysis - 第1件事就绪,current phase = 0, registeredParties = 6.
****************************************************** parties都arrived,任务就绪待执行,phase会在onAdvance方法调用完自增。
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0000,0000 0000 0000 0110,0000 0000 0000 0000 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000000 | phaseOf : 0 *
* >>> parties : 0000000000000110 | partiesOf : 6 *
* >>> unarrived : 0000000000000000 | unarrivedOf: 0 *
* >>> | arrivedOf : 6 *
****************************************************************************************************
17:18:33.465 [pool-1-thread-1] INFO PhaserStateAnalysis - 开始打扫主卧...
17:18:33.465 [pool-1-thread-2] INFO PhaserStateAnalysis - 开始打扫次卧...
17:18:33.465 [pool-1-thread-5] INFO PhaserStateAnalysis - 开始打扫阳台...
17:18:33.465 [pool-1-thread-4] INFO PhaserStateAnalysis - 开始打扫卫生间...
17:18:33.465 [pool-1-thread-3] INFO PhaserStateAnalysis - 开始打扫客厅...
17:18:33.466 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 打扫完成了,满意!换批影分身来弄个3菜1汤吧
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0001,0000 0000 0000 0110,0000 0000 0000 0110 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000001 | phaseOf : 1 *
* >>> parties : 0000000000000110 | partiesOf : 6 *
* >>> unarrived : 0000000000000110 | unarrivedOf: 6 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:33.834 [pool-1-thread-4] INFO PhaserStateAnalysis - 完成打扫卫生间.
17:18:33.915 [pool-1-thread-3] INFO PhaserStateAnalysis - 完成打扫客厅.
17:18:33.932 [pool-1-thread-5] INFO PhaserStateAnalysis - 完成打扫阳台.
17:18:34.158 [pool-1-thread-1] INFO PhaserStateAnalysis - 完成打扫主卧.
17:18:34.269 [pool-1-thread-2] INFO PhaserStateAnalysis - 完成打扫次卧.
17:18:34.270 [pool-1-thread-2] INFO PhaserStateAnalysis - 第2件事就绪,current phase = 1, registeredParties = 5.
****************************************************** parties都arrived,任务就绪待执行,phase会在onAdvance方法调用完自增。
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0001,0000 0000 0000 0101,0000 0000 0000 0000 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000001 | phaseOf : 1 *
* >>> parties : 0000000000000101 | partiesOf : 5 *
* >>> unarrived : 0000000000000000 | unarrivedOf: 0 *
* >>> | arrivedOf : 5 *
****************************************************************************************************
17:18:34.270 [pool-1-thread-4] INFO PhaserStateAnalysis - 开始炒青菜...
17:18:34.270 [pool-1-thread-3] INFO PhaserStateAnalysis - 开始红烧肉...
17:18:34.270 [pool-1-thread-5] INFO PhaserStateAnalysis - 开始油焖大虾...
17:18:34.270 [pool-1-thread-1] INFO PhaserStateAnalysis - 开始番茄鸡蛋汤...
17:18:34.271 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 菜炒好了,看起来还不错,要喊老婆孩子出来吃饭了
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0010,0000 0000 0000 0101,0000 0000 0000 0101 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000010 | phaseOf : 2 *
* >>> parties : 0000000000000101 | partiesOf : 5 *
* >>> unarrived : 0000000000000101 | unarrivedOf: 5 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:34.353 [pool-1-thread-1] INFO PhaserStateAnalysis - 完成番茄鸡蛋汤.
17:18:34.898 [pool-1-thread-3] INFO PhaserStateAnalysis - 完成红烧肉.
17:18:35.166 [pool-1-thread-4] INFO PhaserStateAnalysis - 完成炒青菜.
17:18:35.234 [pool-1-thread-5] INFO PhaserStateAnalysis - 完成油焖大虾.
17:18:35.235 [pool-1-thread-5] INFO PhaserStateAnalysis - 第3件事就绪,current phase = 2, registeredParties = 3.
****************************************************** parties都arrived,任务就绪待执行,phase会在onAdvance方法调用完自增。
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0010,0000 0000 0000 0011,0000 0000 0000 0000 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000010 | phaseOf : 2 *
* >>> parties : 0000000000000011 | partiesOf : 3 *
* >>> unarrived : 0000000000000000 | unarrivedOf: 0 *
* >>> | arrivedOf : 3 *
****************************************************************************************************
17:18:35.235 [pool-1-thread-1] INFO PhaserStateAnalysis - 开始喊孩子出来吃饭...
17:18:35.235 [pool-1-thread-2] INFO PhaserStateAnalysis - 开始喊老婆出来吃饭...
17:18:35.236 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 看看都吃完了没有,等会还得洗碗,我就是个工具人......
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0011,0000 0000 0000 0011,0000 0000 0000 0011 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000011 | phaseOf : 3 *
* >>> parties : 0000000000000011 | partiesOf : 3 *
* >>> unarrived : 0000000000000011 | unarrivedOf: 3 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:35.413 [pool-1-thread-2] INFO PhaserStateAnalysis - 完成喊老婆出来吃饭.
17:18:36.037 [pool-1-thread-1] INFO PhaserStateAnalysis - 完成喊孩子出来吃饭.
17:18:36.038 [pool-1-thread-1] INFO PhaserStateAnalysis - 第4件事就绪,current phase = 3, registeredParties = 2.
****************************************************** parties都arrived,任务就绪待执行,phase会在onAdvance方法调用完自增。
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0011,0000 0000 0000 0010,0000 0000 0000 0000 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000011 | phaseOf : 3 *
* >>> parties : 0000000000000010 | partiesOf : 2 *
* >>> unarrived : 0000000000000000 | unarrivedOf: 0 *
* >>> | arrivedOf : 2 *
****************************************************************************************************
17:18:36.038 [pool-1-thread-3] INFO PhaserStateAnalysis - 开始洗碗...
17:18:36.039 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 来看看洗的干不干净。
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0100,0000 0000 0000 0010,0000 0000 0000 0010 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000100 | phaseOf : 4 *
* >>> parties : 0000000000000010 | partiesOf : 2 *
* >>> unarrived : 0000000000000010 | unarrivedOf: 2 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:36.040 [main] INFO PhaserStateAnalysis - >>>>>>>>>>>>>>> PhaserStateValue:
****************************************************** 哎呀,肚子叫了,居然饿醒了,看来最近火影看太多了啊~
* >>> state_val_: 1,000 0000 0000 0000 0000 0000 0000 0100,0000 0000 0000 0001,0000 0000 0000 0001 *
* >>> terminated: 1 *
* >>> phase : 0000000000000000000000000000100 | phaseOf : -2147483644 *
* >>> parties : 0000000000000001 | partiesOf : 1 *
* >>> unarrived : 0000000000000001 | unarrivedOf: 1 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
17:18:36.101 [pool-1-thread-3] INFO PhaserStateAnalysis - 完成洗碗.
输出说明
格式
因为个人想看的信息有点多,所以用*
把输出信息圈起来了,右上方表示输出信息代表的阶段,中间依次是:
- state值的二进制表示
- 是否终止的二进制表示
- phase二进制表示和值
- parties二进制表示和值
- unarrived的二进制表示和值。
****************************************************** 刚创建好一个Phaser,默认parties为1,表示我carry全场
* >>> state_val_: 0,000 0000 0000 0000 0000 0000 0000 0000,0000 0000 0000 0001,0000 0000 0000 0001 *
* >>> terminated: 0 *
* >>> phase : 0000000000000000000000000000000 | phaseOf : 0 *
* >>> parties : 0000000000000001 | partiesOf : 1 *
* >>> unarrived : 0000000000000001 | unarrivedOf: 1 *
* >>> | arrivedOf : 0 *
****************************************************************************************************
执行流程
- 在每个阶段执行逻辑里面,当调用完
phaser.arriveAndAwaitAdvance()
,当前线程挂起等待当前phase所有任务执行完毕再被唤醒 - 接着执行自定义的onAdvance方法,打印日志信息“第n件事就绪”
- 此时查看下phaser实例的状态,都是parties都arrived,unarrived为0,表示准备就绪了,接下来要执行任务了。
- 接下来Runnable任务中日志信息会被打印
- 然后当前phase所有的
arriveAndAwaitAdvance()
阻塞处代码后的逻辑都会被执行,直到又遇到phaser.arriveAndAwaitAdvance()
阻塞等待当前phase的任务都执行结束。 - 这时能看到会先打印没有sleep的各task中的peekPhaserState的内容,再打印HouseholdJob中的
log.info("完成{}.", name);
,最后当前phase的任务都执行结束,进入下一个phase。 - 循环到第一步
总结
本文我们讲解了Phaser的使用逻辑,使用一个简单示例来说明用Phaser的多个阶段处理多个任务的能力。
完整代码再github上:https://github.com/mrcharleshu/java-main/blob/master/src/main/java/com/charles/thread/state/PhaserStateAnalysis.java
最后若发现写的不对的地方,希望能在评论区留下意见,万分感激。