线程间通信
线程间通信,就是对同进程类共享资源的安全访问,Java中通过AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)同步器来实现资源安全访问,常见基础工具类型,java.util.concurrent.CountDownLatch(java1.5)、java.util.concurrent.Semaphore(java1.5)、java.util.concurrent.CyclicBarrier(java1.5)、java.util.concurrent.Phaser(java7),关于AQS的讲解,后续在源码解读的篇章中会讲到。
java.util.concurrent.CountDownLatch
需要注意的是,如果执行次数小于了count,会一直阻塞
/**
* countDownLatch控制
* @throws InterruptedException
*/
public static void countDownLatch() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
int size = 10;
CountDownLatch countDownLatch = new CountDownLatch(size);
for(int i = 0;i<size;i++){
executorService.execute(new CountDownLatchTask(countDownLatch));
}
countDownLatch.await();
System.out.println("执行完毕");
executorService.shutdown();
}
private static class CountDownLatchTask implements Runnable {
private CountDownLatch latch;
public CountDownLatchTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行任务。");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
}
java.util.concurrent.Semaphore
同时permits个线程执行,实力模拟同时两个任务执行。
/**
* 信号量限流
*/
private static void semaphore() {
ExecutorService executorService = Executors.newFixedThreadPool(4);
int permits = 2;
Semaphore semaphore = new Semaphore(permits);
for (int i = 0; i < 10; i++) {
executorService.execute(new SemaphoreTask(semaphore));
}
executorService.shutdown();
}
private static class SemaphoreTask implements Runnable {
Semaphore semaphore;
public SemaphoreTask(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
//获取一个信号
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行任务.");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
//执行完后释放
semaphore.release();
}
}
java.util.concurrent.CyclicBarrier
可循环使用的屏障,可以理解为可以循环使用的计数器,达到parties后继续往下执行。
/**
* 需要注意,在线程池中每个线程执行完自己的任务后就await()
* 在一种情况下会出现阻塞,当parties 大于线程池最大线程数时,所有的线程都被阻塞
* 比如这里的parties = 5,FixedThreadPool.nThreads=5,如果parties>FixedThreadPool.nThreads会一直阻塞
* 其实还是容易想到,可执行线程都在等待剩余任务。
*/
private static void cyclicBarrier() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println("任务执行完毕");
});
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
executorService.execute(new CyclicBarrierTask(cyclicBarrier));
}
executorService.shutdown();
}
private static class CyclicBarrierTask implements Runnable{
private CyclicBarrier barrier;
CyclicBarrierTask(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"执行任务。");
TimeUnit.SECONDS.sleep(3);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
java.util.concurrent.Phaser(java7)
A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
可重用的同步屏障,和CyclicBarrier and CountDownLatch功能相似,但支持更多的操作。
模拟,所有人玩一个游戏,所有人都要完成n阶段的任务,每个阶段完成后,都有奖励。
/**
* 玩一个游戏,所有人都要完成n阶段的任务,每个阶段完成后,都有奖励
*/
private static void phaser(final int n) {
Phaser phaser = new Phaser(){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"===============完成"+phase+"阶段的事务,派发礼物。");
return phase >= n || registeredParties == 0;
}
};
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 4; i++) {
executorService.execute(new PhaserTask(phaser));
}
executorService.shutdown();
}
private static class PhaserTask implements Runnable{
Phaser phaser;
public PhaserTask(Phaser phaser){
this.phaser = phaser;
}
@Override
public void run() {
phaser.register();
Random random = new Random();
do {
System.out.println(Thread.currentThread().getName() + "进入"+phaser.getPhase()+"阶段。");
try {
int elapsed = random.nextInt(10);
TimeUnit.SECONDS.sleep(elapsed);
System.out.println(Thread.currentThread().getName() + "耗时"+elapsed+"s.");
} catch (InterruptedException e) {
e.printStackTrace();
}
//进入下一个阶段
phaser.arriveAndAwaitAdvance();
}while(!phaser.isTerminated());
//执行完毕
phaser.arriveAndDeregister();
}
}
总结
这章节讲了4中同步器,java.util.concurrent.CountDownLatch、java.util.concurrent.Semaphore、java.util.concurrent.CyclicBarrier、java.util.concurrent.Phaser,需要自己写实例思考才能灵活运用,下一章节,线程池使用。