同步模式之保护性暂停
1. 定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
2. 简单版 GuardedObject
/**
* @author lxy
* @version 1.0
* @Description 一个线程等待另一个线程的执行结果
* @date 2022/6/23 15:13
*/
@Slf4j(topic = "c.Test13")
public class Test13 {
//线程1 等待 线程2 的下载结果
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
log.debug("等待结果");
List <String> list = (List <String>) guardedObject.get();
log.debug("结果大小:{}",list.size());
},"t1").start();
new Thread(()->{
log.debug("执行下载");
try {
List <String> list = Downloader.download();
guardedObject.create(list);
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class GuardedObject{
//结果
private Object response;
//获取结果
public Object get(){
synchronized (this){
while (response == null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
//产生结果
public void create(Object response){
synchronized (this){
//给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
3. 带超时版 GuardedObject
/**
* @author lxy
* @version 1.0
* @Description 带超时版 GuardedObject
* @date 2022/6/24 11:37
*/
@Slf4j(topic = "c.Test14")
public class Test14 {
//线程 1 等待 线程2的下载结束
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
log.debug("begin");
Object response = guardedObject.get(2000);
log.debug("结果是{}",response);
},"t1").start();
new Thread(()->{
log.debug("begin");
// 情况一
Sleeper.sleep(1);
guardedObject.create(new Object());
//情况二
// Sleeper.sleep(3);
// guardedObject.create(new Object());
// 情况三
// Sleeper.sleep(1);
// guardedObject.create(null);
},"t2").start();
}
}
class GuardedObject{
//结果
private Object response;
//获取结果
public Object get(long timeout){
synchronized (this){
//开始时间
long begin = System.currentTimeMillis();
//经历的时间
long passedTime = 0;
while (response == null){
long waitTime = timeout - passedTime;
//经历的时间超过了最大等待时间,退出循环
if(waitTime <= 0){
break;
}
try {
this.wait(waitTime);//为了防止虚假唤醒,等待时间为waitTime
} catch (InterruptedException e) {
e.printStackTrace();
}
//求经历时间
passedTime = System.currentTimeMillis()-begin;
}
return response;
}
}
//产生结果
public void create(Object response){
synchronized (this){
//给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
*. 原理之 join
4. 多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右
侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,
这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
新增 id 用来标识 Guarded Object
class GuardedObject{
private int id;
//...
}
中间解耦类
/**
*
* 注意:这里的MailBoxes选择了静态类而不是选择单例,为什么?
* 因为MailBoxes就相当于工具类呀!!!!这就是个处理读写共享资源的工具类,为了解藕而这么写的,和MVC架构差不多
*/
//邮件类
class Mailboxes{
//由于boxes是线程安全的,所以多线程对其操作也不再使用synchronized修饰
private static Map<Integer, GuardedObject> boxes = new Hashtable <>();
//表示信件id
private static int id = 1;
//产生唯一id
private static synchronized int generateId(){
return id++;
}
//根据id获取邮件
public static GuardedObject getGuardedObject(int id){
return boxes.remove(id);
}
//将一个邮件放在邮箱中
public static GuardedObject createGuardedObject(){
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(),go);
return go;
}
//获得所有待送邮件的id
public static Set<Integer> getIds(){
return boxes.keySet();
}
}
业务相关类
//邮差类
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
private int id;
private String mail;//信件内容
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}",id,mail);
guardedObject.create(mail);
}
}
//居民类
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
//收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();//创建信件
log.debug("开始收信 id:{}",guardedObject.getId());
Object mail = guardedObject.get(5000);//进行收件
log.debug("收到信 id:{},内容:{}",guardedObject.getId(),mail);
}
}
测试:
@Slf4j(topic = "c.Test15")
public class Test15 {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new People().start();//初始化信件和用户线程,有多少个用户线程就对应着有多少信件
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id,"内容"+id).start();//邮递员开始根据id将信送入信箱
}
}
}
运行结果:
同步模式之顺序控制
1. 固定运行顺序
比如,必须先t2线程打印 2 后 t1线程再 打印1。
1.1 wait notify 版
/**
* @author lxy
* @version 1.0
* @Description 固定线程运行顺序:先让线程2执行,再线程1执行
* @date 2022/7/5 15:46
*/
@Slf4j(topic = "c.Test21")
public class Test21 {
static final Object lock = new Object();
//表示t2是否运行过
static boolean t2runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!t2runned) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("2");
t2runned = true;
lock.notify();
}
}, "t2");
t1.start();
t2.start();
}
}
输出:
15:39:39.232 c.Test21 [t2] - 2
15:39:39.234 c.Test21 [t1] - 1
1.2 await signal 版
/**
* @author lxy
* @version 1.0
* @Description 固定线程运行顺序:先让线程2执行,再线程1执行
* @date 2022/7/5 15:46
*/
@Slf4j(topic = "c.Test22")
public class Test22 {
static ReentrantLock ROOM = new ReentrantLock();
static Condition waitSet = ROOM.newCondition();
//表示t2是否运行过
static boolean t2runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
ROOM.lock();
try {
while (!t2runned) {
try {
waitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
} finally {
ROOM.unlock();
}
}, "t1");
Thread t2 = new Thread(() -> {
ROOM.lock();
try {
log.debug("2");
t2runned = true;
waitSet.signal();
} finally {
ROOM.unlock();
}
}, "t2");
t1.start();
t2.start();
}
}
1.3 Park Unpark版本
可以看到,实现上很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
- 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了
while
循环来解决此问题 - 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
@Slf4j(topic = "c.Test23")
public class Test23 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();//阻塞t1线程
log.debug("1");
}, "t1");
t1.start();
new Thread(()->{
log.debug("2");
LockSupport.unpark(t1);//让t1开始运行
},"t2").start();
}
}
2. 交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
2.1 wait notify 版
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/5 17:19
*/
public class Test24 {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1, 5);//定义公共锁对象
new Thread(()->{
wn.print("a",1,2);
},"t1").start();
new Thread(()->{
wn.print("b",2,3);
},"t2").start();
new Thread(()->{
wn.print("c",3,1);
},"t3").start();
}
}
/**
* 输出内容 等待标记 下一个标记
* a 1 2
* b 2 3
* c 3 1
*
* 关键点:使用waitFlag,来决定本线程是等待还是继续向运行。使用nextFlag来让哪个线程放弃等待,继续运行
*
*/
class WaitNotify{
private int flag;
private int loopNumber;
public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
//打印
public void print(String str,int waitFlag,int nextFlag){
for (int i = 0; i < 5; i++) {
//连续5轮
synchronized (this){
while (flag != waitFlag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;//更新标记
this.notifyAll();//唤醒阻塞的线程,让符合的进行执行
}
}
}
}
2.2 Lock 条件变量版
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/5 17:55
*/
public class Test25 {
public static void main(String[] args) {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(()->{
awaitSignal.print("a",a,b);
},"t1").start();
new Thread(()->{
awaitSignal.print("b",b,c);
},"t2").start();
new Thread(()->{
awaitSignal.print("c",c,a);
},"t3").start();
Sleeper.sleep(1);
awaitSignal.lock();
try {
System.out.println("开始打印...");
a.signal();
}finally {
awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock{
private int loopNumber;
public AwaitSignal(int loopNumber){
this.loopNumber = loopNumber;
}
//打印数据。 current:当前线程要进入哪一间休息室。next:下一间休息室
public void print(String str, Condition current,Condition next){
for (int i = 0; i < 5; i++) {
lock();
try {
current.await();
System.out.print(str);
next.signal();//敲醒另外一个线程,打印数据
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}
输出:
开始打印...
abcabcabcabcabc
2.3 Park Unpark 版
/**
* @author lxy
* @version 1.0
* @Description
* @date 2022/7/5 18:26
*/
@Slf4j(topic = "c.Test26")
public class Test26 {
static Thread t1,t2,t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> {
pu.print("a", t2);
});
t2 = new Thread(() -> {
pu.print("b", t3);
});
t3 = new Thread(() -> {
pu.print("c", t1);
});
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);//唤醒t1
}
}
class ParkUnpark{
private int loopNumber;
public ParkUnpark(int loopNumber){
this.loopNumber = loopNumber;
}
public void print(String str,Thread next){
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();//当前线程等待
System.out.print(str);
LockSupport.unpark(next);//唤醒下一个线程
}
}
}
异步模式之生产者/消费者
1. 定义
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(保护暂停模式是一一对应的)
- 消费队列可以用来平衡生产和消费的线程资源
- 比如如果使用保护性暂停模式,一个用户需要对应一个邮差,比较浪费。而生产者消费者模式便可以做到多个用户对应一个邮差。
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
- 这是异步模式,生产者生产的内容并不会被立刻消费,而是先存在于队列中,通过队列被消费者消费。
2. 实现
/**
* @author lxy
* @version 1.0
* @Description 生产者消费者模式
* @date 2022/6/24 19:08
*/
@Slf4j(topic = "c.Test16")
public class Test16 {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;//effectively final
new Thread(()->{
//lambda 表达式中使用的变量应该是 final 或者有效的 final
queue.put(new Message(id,"值"+id));
},"生产者"+i).start();
}
new Thread(()->{
while (true){
Sleeper.sleep(1);
queue.take();
}
},"消费者").start();
}
}
//消息队列类, java线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue{
private LinkedList<Message> list = new LinkedList <Message>();//底层实现是一个队列
//队列容量
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
//接收消息
public Message take(){
//检查队列是否为空
synchronized (list){
while (list.isEmpty()){
try {
log.debug("队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从队列头部获取消息并返回
// return list.removeFirst(); //不能这样写,要分开写。不然没法使用notifyAll唤醒生产者
Message message = list.removeFirst();
log.debug("已消费消息{}",message);
list.notifyAll();//消费完唤醒生产者继续生产
return message;
}
}
//存入消息
public void put(Message message){
synchronized (list){
while (list.size() == capcity){
try {
log.debug("队列已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息{}",message);
list.notifyAll();
}
}
}
final class Message{
//使用final,表示没有子类继承。没有setter方法,表示在构造初始的时候就创建好,后序不支持修改
private int id;//消息的编号
private Object value;//消息的内容
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
输出:
注意:synchronized
锁的是多个线程操作的共享变量,本例中最好使用 list非this,我们使用的也是list.wait()
和list.notifyAll()
。
终止模式之两阶段终止模式
Two Phase Termination
在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
1、错误思路
- 使用线程对象的 stop() 方法停止线程
- stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁
- 使用 System.exit(int) 方法停止线程
- 目的仅是停止一个线程,但这种做法会让整个程序都停止
2、两阶段终止模式
图解如下:
- 如果监控线程在阻塞阶段被interrupt,则捕获异常,手动设置打断标记(因为标记会被清除),下一轮料理后事后结束循环。
- 如果监控线程在正常工作阶段被interrupt,则会设置打断标记,下一轮料理后事后结束循环。
2.1 利用 isInterrupted
原理:interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait,还是正常运行
代码实现:
@Slf4j(topic = "c.Test10")
public class Test10 {
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination tpt = new TwoPhaseTermination();
tpt.start();
Thread.sleep(4000);
tpt.stop();
}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{
private Thread monitor;
//启动监控线程
public void start(){
monitor = new Thread(() -> {
while (true) {
Thread currentThread = Thread.currentThread();
if (currentThread.isInterrupted()) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);//情况一:阻塞时interrupt
log.debug("执行监控记录");//情况二:正常运行时被interrupt
} catch (InterruptedException e) {
e.printStackTrace();
currentThread.interrupt();//阻塞被打断后,程序恢复正常运行。再次被打断,将会有一个打断标记(因为此时是处于正常状态的),下一轮会被停止线程。
}
}
});
monitor.start();
}
//停止监控线程
public void stop(){
monitor.interrupt();
}
}
运行结果:
2.2 利用停止标记
@Slf4j(topic = "c.Test10")
public class Test10 {
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination tpt = new TwoPhaseTermination();
tpt.start();
Thread.sleep(4000);
log.debug("stop...");
tpt.stop();
}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{
//监控线程
private Thread monitor;
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性
// 我们的例子中,即主线程把它修改为 true 对 monitor 线程可见
private volatile boolean stop = false;
//启动监控线程
public void start(){
monitor = new Thread(() -> {
while (true) {
Thread currentThread = Thread.currentThread();
if (stop) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("执行监控记录");
} catch (InterruptedException e) {
}
}
},"monitor");
monitor.start();
}
//停止监控线程
public void stop(){
stop = true;
monitor.interrupt();//不加这个的话,线程会把这一轮执行完才终止
}
}
输出:
18:17:14.091 c.TwoPhaseTermination [monitor] - 执行监控记录
18:17:15.099 c.TwoPhaseTermination [monitor] - 执行监控记录
18:17:16.110 c.TwoPhaseTermination [monitor] - 执行监控记录
18:17:17.088 c.Test10 [main] - stop...
18:17:17.088 c.TwoPhaseTermination [monitor] - 料理后事
同步模式之 Balking
**引入:**监控线程主要用来监控CPU、内存的占用等,监控线程只需要一个就行。但是上面的例子我们并没有对线程的数量进行限制。就会出现 每 tpt.start();
就会创建一个监控线程,这样是毫无意义的。所以我们如何保证方法只执行一次,下次调用直接返回呢?— Balking模式
1. 定义
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
2.实现
例一:对终止模式中的监控线程的改进
**注意:**尽量缩小synchronized块的大小,可以提高并发度和程序的性能
例二:在Tomcat多线程环境下演示
@Service
@Slf4j
public class MonitorService {
private volatile boolean stop;
private volatile boolean starting;
private Thread monitorThread;
public void start() {
// 缩小同步范围,提升性能
synchronized (this) {
log.info("该监控线程已启动?({})", starting);
if (starting) {
return;
}
starting = true;
}
// 由于之前的 balking 模式,以下代码只可能被一个线程执行,因此无需互斥
monitorThread = new Thread(() -> {
while (!stop) {
report();
sleep(2);
}
// 这里的监控线程只可能启动一个,因此只需要用 volatile 保证 starting 的可见性
//也就是让Tomcat中的线程看到starting的修改
log.info("监控线程已停止...");
starting = false;
});
stop = false;
log.info("监控线程已启动...");
monitorThread.start();
}
private void report() {
Info info = new Info();
info.setTotal(Runtime.getRuntime().totalMemory());
info.setFree(Runtime.getRuntime().freeMemory());
info.setMax(Runtime.getRuntime().maxMemory());
info.setTime(System.currentTimeMillis());
MonitorController.QUEUE.offer(info);
}
private void sleep(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
public synchronized void stop() {
stop = true;
// 不加打断需要等到下一次 sleep 结束才能退出循环,这里是为了更快结束
monitorThread.interrupt();
}
}
当前端页面多次点击按钮调用 start 时,最终只创建一个监控线程
输出:
[http-nio-8080-exec-10] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(false)
[http-nio-8080-exec-10] cn.itcast.monitor.service.MonitorService - 监控线程已启动...
[http-nio-8080-exec-8] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
[http-nio-8080-exec-9] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
[http-nio-8080-exec-7] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
[Thread-16] cn.itcast.monitor.service.MonitorService - 监控线程已停止...
它还经常用来实现线程安全的单例
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。