摘要:
线程与线程之间不是相互独立的个体,它们彼此之间需要相互通信和协作,最典型的例子就是生产者-消费者问题。本文首先介绍 wait/notify 机制,并对实现该机制的两种方式——synchronized+wait-notify模式和Lock+Condition模式进行详细剖析,以作为线程间通信与协作的基础。进一步地,以经典的生产者-消费者问题为背景,熟练对 wait/notify 机制的使用。最后,对 Thread 类中的 join() 方法进行源码分析,并以宿主线程与寄生线程的协作为例进行说明。
一:先从一个例子来说明:
//创建条件对象 Condition conditionObj=ticketLock.newCondition();
方法 | 函数方法对应的描述 |
void await() | 将该线程放到条件等待池中(对应wait()方法) |
void signalAll() | 解除该条件等待池中所有线程的阻塞状态(对应notifyAll()方法) |
void signal() | 从该条件的等待池中随机地选择一个线程,解除其阻塞状态(对应notify()方法) |
二.线程间通信案例
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 10:10 2018/6/3 * @Description: 线程间通讯:多个线程在处理同一个资源,但是任务却不同 */ //资源类 class Resource { String name; String sex; } //输入 class Input implements Runnable { //需要使用这个对象,但是不能new这个对象,因为需要使用的是同一个对象 //此时可以使用的方式是传参数,用构造方法 Resource r; Input(Resource r) { this.r = r; } @Override public void run() { int x=0; while (true) { if (x == 0) { r.name = "mike"; r.sex = "nan"; } else { r.name = "丽丽"; r.sex = "女女女女"; } //模拟不断的进行生产 x=(x+1)%2; } } } //输出 class Output implements Runnable { Resource r; Output(Resource r) { this.r = r; } @Override public void run() { while (true) { System.out.println(r.name+"...."+r.sex); } } } public class ResourceDemo { public static void main(String[] args) { //创建资源 Resource r = new Resource(); //创建任务 Input in = new Input(r); Output out = new Output(r); //创建线程 Thread t1 = new Thread(in); Thread t2 = new Thread(out); //开启线程 t1.start(); t2.start(); } }
输出的结果是:
这样出现了线程的安全问题,现在分析一下出现线程安全问题的原因:
分析线程安全问题出现的原因首先考虑两点:1、线程运行的代码中是否有共享数据,2、是否有多条语句在操作共享数据;在上面的代码中resource是共享数据,name和sex是资源中的属性,并且有多条语句在进行操作共享数据;
想到的解决方法是加一个锁:代码如下
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 10:10 2018/6/3 * @Description: 线程间通讯:多个线程在处理同一个资源,但是任务却不同 */ //资源类 class Resource { String name; String sex; } //输入 class Input implements Runnable { //需要使用这个对象,但是不能new这个对象,因为需要使用的是同一个对象 //此时可以使用的方式是传参数,用构造方法 Resource r; Object obj =new Object(); Input(Resource r) { this.r = r; } @Override public void run() { int x=0; while (true) { synchronized (obj) {//加上同步锁 if (x == 0) { r.name = "mike"; r.sex = "nan"; } else { r.name = "丽丽"; r.sex = "女女女女"; } //模拟不断的进行生产 x = (x + 1) % 2; } } } } //输出 class Output implements Runnable { Resource r; Output(Resource r) { this.r = r; } @Override public void run() { while (true) { System.out.println(r.name+"...."+r.sex); } } } public class ResourceDemo { public static void main(String[] args) { //创建资源 Resource r = new Resource(); //创建任务 Input in = new Input(r); Output out = new Output(r); //创建线程 Thread t1 = new Thread(in); Thread t2 = new Thread(out); //开启线程 t1.start(); t2.start(); } }
会发现加了锁之后,依然会出现上面的线程不安全的问题,这时就该考虑的是同步的前提:在一个锁里面有多个线程;然后就可以看出上面的锁里面只有一个线程就是输入线程,所以应该保证输出线程也应该在同步里面。而且输入线程和输出线程需要使用的是同一个锁,此时不能使用this因为this代表的是本类,这里是有两个类的。所以可以使用的是resource.class或者是r,此时的代码是:
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 10:10 2018/6/3 * @Description: 线程间通讯:多个线程在处理同一个资源,但是任务却不同 */ //资源类 class Resource { String name; String sex; } //输入 class Input implements Runnable { //需要使用这个对象,但是不能new这个对象,因为需要使用的是同一个对象 //此时可以使用的方式是传参数,用构造方法 Resource r; Object obj =new Object(); Input(Resource r) { this.r = r; } @Override public void run() { int x=0; while (true) { synchronized (r) { if (x == 0) { r.name = "mike"; r.sex = "nan"; } else { r.name = "丽丽"; r.sex = "女女女女"; } //模拟不断的进行生产 x = (x + 1) % 2; } } } } //输出 class Output implements Runnable { Resource r; Output(Resource r) { this.r = r; } @Override public void run() { while (true) { synchronized (r) { System.out.println(r.name+"...."+r.sex); } } } } public class ResourceDemo { public static void main(String[] args) { //创建资源 Resource r = new Resource(); //创建任务 Input in = new Input(r); Output out = new Output(r); //创建线程 Thread t1 = new Thread(in); Thread t2 = new Thread(out); //开启线程 t1.start(); t2.start(); } }
结果如下:
此时输出的结果是一片一片的,不符合我们的预期的效果,我们希望的是输入一个输出一个。上面的结果是因为当输入线程拿到了CPU的执行权之后,第一次赋值为mike nan 然后CPU的执行权仍然在输入线程那边,所以会出现第二次输入的丽丽 女覆盖掉mike nan,直到最后一个赋值会覆盖前面的赋值;CPU到了输出线程就会不断的输出同一个结果直到CPU的执行权切换到了输入线程;下面解决输入一个输出一个,在资源里面定义一个标记 flag如果有资源就不往里面存放数据,如果没有就往里面放;
修改后的代码如下所示:
package com.huanghe.chapter21; import jdk.nashorn.internal.ir.Flags; /** * @Author: River * @Date:Created in 10:10 2018/6/3 * @Description: 线程间通讯:多个线程在处理同一个资源,但是任务却不同 * * 等待唤醒机制: * 1:wait():让线程处于冻结状态,被wait的线程会被存储到线程池中 * 2:notify():唤醒线程池中的一个线程(任意) * 3:notifyAll():唤醒线程池中的所有线程 * 这些方法必须是定义在同步中。因为这些方法是用于操作线程状态的方法,必须要明确到底是操作的是哪个锁上的线程 * 在一个类里面能写的同步是有很多的,有A,B,C三个同步,也就是有A锁,B锁,C锁三个锁,在A锁上wait,拿B锁的notify是不起作用的 */ //资源类 class Resource { String name; String sex; boolean flag = false; } //输入 class Input implements Runnable { //需要使用这个对象,但是不能new这个对象,因为需要使用的是同一个对象 //此时可以使用的方式是传参数,用构造方法 Resource r; Object obj =new Object(); Input(Resource r) { this.r = r; } @Override public void run() { int x=0; while (true) { synchronized (r) { if (r.flag) { try { r.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if (x == 0) { r.name = "mike"; r.sex = "nan"; } else { r.name = "丽丽"; r.sex = "女女女女"; } r.flag = true; r.notify(); } //模拟不断的进行生产 x = (x + 1) % 2; } } } //输出 class Output implements Runnable { Resource r; Output(Resource r) { this.r = r; } @Override public void run() { while (true) { synchronized (r) { if (!r.flag) { try { r.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(r.name+"...."+r.sex); r.flag = false; r.notify(); } } } } public class ResourceDemo { public static void main(String[] args) { //创建资源 Resource r = new Resource(); //创建任务 Input in = new Input(r); Output out = new Output(r); //创建线程 Thread t1 = new Thread(in); Thread t2 = new Thread(out); //开启线程 t1.start(); t2.start(); } }
为什么操作线程的方法被定义到了Object中?
因为这些方法是监视器的方法,监视器就是锁,锁可以是任意的对象,任意对象的方法一定定义在Object中
结果如下:
上面的程序中的资源中的属性应该是私有的,所以可以改进代码如下:
package com.huanghe.chapter21; import jdk.nashorn.internal.ir.Flags; /** * @Author: River * @Date:Created in 10:10 2018/6/3 * @Description: 线程间通讯:多个线程在处理同一个资源,但是任务却不同 * * 等待唤醒机制: * 1:wait():让线程处于冻结状态,被wait的线程会被存储到线程池中 * 2:notify():唤醒线程池中的一个线程(任意) * 3:notifyAll():唤醒线程池中的所有线程 * */ //资源类 class Resource1 { private String name; private String sex; private boolean flag = false; public synchronized void set(String name, String sex) { if (flag) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name; this.sex=sex; flag = true; this.notify(); } public synchronized void out() { if (!flag) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(name+"..."+sex); flag = false; this.notify(); } } //输入 class Input1 implements Runnable { //需要使用这个对象,但是不能new这个对象,因为需要使用的是同一个对象 //此时可以使用的方式是传参数,用构造方法 Resource1 r; Object obj =new Object(); Input1(Resource1 r) { this.r = r; } @Override public void run() { int x=0; while (true) { if (x == 0) { r.set("mike","nan"); } else { r.set("丽丽","女女女"); } //模拟不断的进行生产 x = (x + 1) % 2; } } } //输出 class Output1 implements Runnable { Resource1 r; Output1(Resource1 r) { this.r = r; } @Override public void run() { while (true) { r.out(); } } } public class ResourceDemo2 { public static void main(String[] args) { //创建资源 Resource1 r = new Resource1(); //创建任务 Input1 in = new Input1(r); Output1 out = new Output1(r); //创建线程 Thread t1 = new Thread(in); Thread t2 = new Thread(out); //开启线程 t1.start(); t2.start(); } }
三、生产者消费者
顾名思义,就是一个线程消费,一个线程生产。我们先来看看等待/通知机制下的生产者消费者模式:我们假设这样一个场景,我们是卖北京烤鸭店铺,我们现在只有一条生产线也只有一条消费线,也就是说只能生产线程生产完了,再通知消费线程才能去卖,如果消费线程没烤鸭了,就必须通知生产线程去生产,此时消费线程进入等待状态。在这样的场景下,我们不仅要保证共享数据(烤鸭数量)的线程安全,而且还要保证烤鸭数量在消费之前必须有烤鸭。下面我们通过java代码来实现:
现在模拟一个生产者一个消费者的简单的场景:
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 11:16 2018/6/4 * @Description: */ public class KaoYaResource { private String name; private int count = 1;//烤鸭的初始数量 private boolean flag = false;//判断是否有需要线程等待的标志 /** * 生产烤鸭 */ public synchronized void product(String name){ if(flag){ //此时有烤鸭,等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace() ; } } this.name=name+count;//设置烤鸭的名称 count++; System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name); flag=true;//有烤鸭后改变标志 notifyAll();//通知消费线程可以消费了 } /** * 消费烤鸭 */ public synchronized void consume(){ if(flag){//如果没有烤鸭就等待 try{this.wait();}catch(InterruptedException e){} } System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1 flag = false; notifyAll();//通知生产者生产烤鸭 } }
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 11:30 2018/6/4 * @Description: */ public class Single_Producer_Consumer { public static void main(String[] args) { KaoYaResource r = new KaoYaResource(); Producer pro = new Producer(r); Consumer con = new Consumer(r); //生产者线程 Thread t0 = new Thread(pro); //消费者线程 Thread t2 = new Thread(con); //启动线程 t0.start(); t2.start(); } } /** * @decrition 生产者线程 */ class Producer implements Runnable { private KaoYaResource r; Producer(KaoYaResource r) { this.r = r; } public void run() { while(true) { r.product("北京烤鸭"); } } } /** * @decrition 消费者线程 */ class Consumer implements Runnable { private KaoYaResource r; Consumer(KaoYaResource r) { this.r = r; } public void run() { while(true) { r.consume(); } } }
在这个类中我们创建两个线程,一个是消费者线程,一个是生产者线程,我们分别开启这两个线程用于不断的生产消费,运行结果如下
很显然的情况就是生产一只烤鸭然后就消费一只烤鸭。运行情况完全正常,嗯,这就是单生产者单消费者模式。上面使用的是synchronized关键字的方式实现的,那么接下来我们使用对象锁的方式实现:KaoYaResourceByLock.java
package com.huanghe.chapter21; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Author: River * @Date:Created in 11:36 2018/6/4 * @Description: */ public class KaoyaResourceByLock { private String name; private int count = 1;//烤鸭的初始数量 private boolean flag = false;//判断是否有需要线程等待的标志 //创建一个锁对象 private Lock resourceLock=new ReentrantLock(); //创建条件对象 private Condition condition= resourceLock.newCondition(); /** * 生产烤鸭 */ public void product(String name){ resourceLock.lock();//先获取锁 try{ if(flag){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name+count;//设置烤鸭的名称 count++; System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name); flag=true;//有烤鸭后改变标志 condition.signalAll();//通知消费线程可以消费了 }finally{ resourceLock.unlock(); } } /** * 消费烤鸭 */ public void consume(){ resourceLock.lock(); try{ if(!flag){//如果没有烤鸭就等待 try{condition.await();}catch(InterruptedException e){} } System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1 flag = false; condition.signalAll();//通知生产者生产烤鸭 }finally{ resourceLock.unlock(); } } }
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 11:38 2018/6/4 * @Description: */ public class Mutil_Producer_Consumer { public static void main(String[] args) { KaoYaResource r = new KaoYaResource(); Mutil_Producer pro = new Mutil_Producer(r); Mutil_Consumer con = new Mutil_Consumer(r); //生产者线程 Thread t0 = new Thread(pro); Thread t1 = new Thread(pro); //消费者线程 Thread t2 = new Thread(con); Thread t3 = new Thread(con); //启动线程 t0.start(); t1.start(); t2.start(); t3.start(); } } /** * @decrition 生产者线程 */ class Mutil_Producer implements Runnable { private KaoYaResource r; Mutil_Producer(KaoYaResource r) { this.r = r; } public void run() { while(true) { r.product("北京烤鸭"); } } } /** * @decrition 消费者线程 */ class Mutil_Consumer implements Runnable { private KaoYaResource r; Mutil_Consumer(KaoYaResource r) { this.r = r; } public void run() { while(true) { r.consume(); } } }
结果如下:
不对呀,我们才生产一只烤鸭,怎么就被消费了2次啊,有的烤鸭生产了也没有被消费啊?难道共享数据源没有进行线程同步?我们再看看之前的KaoYaResource.java
public class KaoYaResource { private String name; private int count = 1;//烤鸭的初始数量 private boolean flag = false;//判断是否有需要线程等待的标志 /** * 生产烤鸭 */ public synchronized void product(String name){ if(flag){ //此时有烤鸭,等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name+count;//设置烤鸭的名称 count++; System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name); flag=true;//有烤鸭后改变标志 notifyAll();//通知消费线程可以消费了 } /** * 消费烤鸭 */ public synchronized void consume(){ if(!flag){//如果没有烤鸭就等待 try{this.wait();}catch(InterruptedException e){} } System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1 flag = false; notifyAll();//通知生产者生产烤鸭 } }
修改后的代码如下
package com.huanghe.chapter21; /** * @Author: River * @Date:Created in 11:16 2018/6/4 * @Description: */ public class KaoYaResource { private String name; private int count = 1;//烤鸭的初始数量 private boolean flag = false;//判断是否有需要线程等待的标志 /** * 生产烤鸭 */ public synchronized void product(String name){ while(flag){ //此时有烤鸭,等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name+count;//设置烤鸭的名称 count++; System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name); flag=true;//有烤鸭后改变标志 notifyAll();//通知消费线程可以消费了 } /** * 消费烤鸭 */ public synchronized void consume(){ while(!flag){//如果没有烤鸭就等待 try{this.wait();}catch(InterruptedException e){} } System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1 flag = false; notifyAll();//通知生产者生产烤鸭 } }
在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止wait的条件发生变化而导致线程异常终止,我们在阻塞线程被唤醒的同时还必须对wait的条件进行额外的检查,即 使用 while 循环代替 if 条件;
在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止生产者(消费者)唤醒生产者(消费者),保证生产者和消费者互相唤醒,需要 使用 notifyAll 替代 notify.
到此,多消费者多生产者模式也完成,不过上面用的是synchronied关键字实现的,而锁对象的解决方法也一样将之前单消费者单生产者的资源类中的if判断改为while判断即可代码就不贴了哈。不过下面我们将介绍一种更有效的锁对象解决方法,我们准备使用两组条件对象(Condition也称为监视器)来实现等待/通知机制,也就是说通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。有了前面的分析这里我们直接上代码:
public class ResourceBy2Condition { private String name; private int count = 1; private boolean flag = false; //创建一个锁对象。 Lock lock = new ReentrantLock(); //通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。 Condition producer_con = lock.newCondition(); Condition consumer_con = lock.newCondition(); /** * 生产 * @param name */ public void product(String name) { lock.lock(); try { while(flag){ try{producer_con.await();}catch(InterruptedException e){} } this.name = name + count; count++; System.out.println(Thread.currentThread().getName()+"...生产者5.0..."+this.name); flag = true; // notifyAll(); // con.signalAll(); consumer_con.signal();//直接唤醒消费线程 } finally { lock.unlock(); } } /** * 消费 */ public void consume() { lock.lock(); try { while(!flag){ try{consumer_con.await();}catch(InterruptedException e){} } System.out.println(Thread.currentThread().getName()+"...消费者.5.0......."+this.name);//消费烤鸭1 flag = false; // notifyAll(); // con.signalAll(); producer_con.signal();//直接唤醒生产线程 } finally { lock.unlock(); } } }
从代码中可以看到,我们创建了producer_con 和consumer_con两个条件对象,分别用于监听生产者线程和消费者线程,在product()方法中,我们获取到锁后,如果此时flag为true的话,也就是此时还有烤鸭未被消费,因此生产线程需要等待,所以我们调用生产线程的监控器producer_con的await()的方法进入阻塞等待池;但如果此时的flag为false的话,就说明烤鸭已经消费完,需要生产线程去生产烤鸭,那么生产线程将进行烤鸭生产并通过消费线程的监控器consumer_con的signal()方法去通知消费线程对烤鸭进行消费。
consume()方法也是同样的道理,这里就不过多分析了。我们可以发现这种方法比我们之前的synchronized同步方法或者是单监视器的锁对象都来得高效和方便些,之前都是使用notifyAll()和signalAll()方法去唤醒池中的线程,然后让池中的线程又进入 竞争队列去抢占CPU资源,这样不仅唤醒了无关的线程而且又让全部线程进入了竞争队列中,而我们最后使用两种监听器分别监听生产者线程和消费者线程,这样的方式恰好解决前面两种方式的问题所在,我们每次唤醒都只是生产者线程或者是消费者线程而不会让两者同时唤醒,这样不就能更高效得去执行程序了吗?好了,到此多生产者多消费者模式也分析完毕。