耦合&解耦
在谈生产者消费者模型之前,我们先来谈谈耦合和解耦的概念,要是这两个概念掌握了,那么生产者消费者模型也就掌握一半啦~
耦合
耦合是指两个或者两个以上体系或两种运动形式间通过相互左右而彼此影响以至联合起来的现象。举个例子,有一对热恋中的情侣,水深火热的,谁离开谁都不行了,离开就得死,要是对方有一点风吹草动,这一方就得地动山摇。可以按照琼瑶阿姨的路子继续想象,想成什么样都不过分,他们之间的这种状态就应该叫做“耦合”。
解耦
解耦,字面意思就是解除耦合关系。一般的设计思想就是设计一个缓冲区,一方将自己的数据放在这个缓冲区,另一方可以从这个缓冲区获取对方的数据,就是两方不互相接触,甚至可以一个都不知道一个的存在。举个例子,还是上面的一对情侣,由于太过亲密被他们父母知道了,所以她两不能在一起了,但是他俩又不想断了联系,所以他俩就想了个办法,建了一个秘密基地,把想给对方说的话全放在这个秘密基地里 ,所以看似他俩还在一起但是其实已经分开了,而且假如说有一天她两的秘密基地被别人发现了,那个人也在里面写了话,她两也不会发现。
wait()与notify()方法
wait()方法
- wait()的作用是使当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法是用来将当前线程置入“预执行队列”中,并且在wait()所在的代码处停止执行,直到接到通知或被中断为止。
- wait()方法只能在同步方法中或同步块中调用。如果调用wait()时,没有持有适当的锁,会抛出异常。
- wait()方法执行后,当前线程释放锁,线程与其它线程竞争重新获取锁。
范例:wait()方法的使用
public static void main(String[] args) {
Object obj=new Object();
synchronized (obj){
System.out.println("执行同步块...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行同步块结束...");
}
}
这样在执行到obj.wait()之后就一直等待下去,那么程序肯定不能一直这么等待下去了。这个时候就需要使用到了另外一个方法唤醒的方法notify()。
notify()方法
notify方法就是使停止的线程继续运行。
- 方法notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知notify,并使它们重新获取该对象的对象锁。如果有多个线程等待,则有线程规划器随机挑选出一个呈wait状态的线程。
- 在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
范例:notify()
public class MyThread1 {
public static void main(String[] args) {
Object obj=new Object();
Thread threadA=new Thread(()->{
System.out.println(Thread.currentThread().getName()+"执行同步块..");
synchronized (obj){
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"执行同步块结束..");
});
threadA.setName("Thread-A");
threadA.start();
Thread threadB=new Thread(()->{
System.out.println(Thread.currentThread().getName()+"执行同步块..");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj) {
obj.notify();
}
System.out.println(Thread.currentThread().getName()+"执行同步块结束..");
});
threadB.setName("Thread-B");
threadB.start();
}
}
notifyAll()
public class MyThread1 {
public static void main(String[] args) {
Object object=new Object();
Runnable runnable=new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"代码段开始..");
synchronized (object){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"代码段结束..");
}
};
for(int i=0;i<3;i++){
Thread thread=new Thread(runnable);
thread.setName("Thread-Wait"+i);
thread.start();
}
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object){
System.out.println("Thread-notify已经运行中..");
object.notifyAll();
}
}
});
thread.setName("Thread-notify");
thread.start();
}
}
出现阻塞的情况
- 线程调用 sleep方法,主动放弃占用的处理器资源。
- 线程调用了阻塞式IO方法,在该方法返回前,该线程被塞。
- 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。
- 线程等待某个通知。
- 程序调用了 suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。
run()方法运行结束后进入销毁阶段,整个线程执行完毕。每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
生产者消费者模型要解决的问题:
1.生产和消费速度不匹配
2.软件开发组件的解耦
范例:
package hhh.Test.MyThread;
public class Goods {
private final String id;
private final String name;
public Goods(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Goods{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
'}';
}
}
package hhh.Test.MyThread;
public class PCConfig {
//可以通过这些值来调整队列大小和速率
public static final int MAX_CAPACITY=10;
public static final int MAX_PRODUCER=5;
public static final int MAX_CUSTOMER=4;
}
package hhh.Test.MyThread;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
//静态导入
import static hhh.Test.MyThread.PCConfig.MAX_CAPACITY;
import static hhh.Test.MyThread.PCConfig.MAX_CUSTOMER;
import static hhh.Test.MyThread.PCConfig.MAX_PRODUCER;
public class TestPC {
public static void main(String[] args) {
final Queue queue=new ArrayBlockingQueue(MAX_CAPACITY);//设置队列最大容量为10
//创建生产者线程
final Runnable producer=new Producer(queue);
for(int i=0;i<MAX_PRODUCER;i++) {
Thread pThread = new Thread(producer, "Thread-Producer");
pThread.start();
}
//创建消费者xianc
final Runnable custorm=new Customer(queue);
for(int i=0;i<MAX_CUSTOMER;i++) {
Thread cThread = new Thread(custorm, "Thread-Custorm");
cThread.start();
}
}
}
package hhh.Test.MyThread;
import java.util.Queue;
public class Customer implements Runnable {
private final Queue<Goods> queue;
public Customer(Queue<Goods> queue) {
this.queue = queue;
}
public void run(){
while(true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (queue) {
if (queue.isEmpty()) {//如果为空,就唤醒所有生产者线程生产商品
queue.notifyAll();
} else {
Goods goods = queue.poll();
System.out.println(Thread.currentThread().getName() + "消费 " + goods);
}
}
}
}
}
package hhh.Test.MyThread;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
//静态导入
import static hhh.Test.MyThread.PCConfig.MAX_CAPACITY;
import static hhh.Test.MyThread.PCConfig.MAX_CUSTOMER;
import static hhh.Test.MyThread.PCConfig.MAX_PRODUCER;
public class TestPC {
public static void main(String[] args) {
final Queue queue=new ArrayBlockingQueue(MAX_CAPACITY);//设置队列最大容量为10
//创建生产者线程
final Runnable producer=new Producer(queue);
for(int i=0;i<MAX_PRODUCER;i++) {
Thread pThread = new Thread(producer, "Thread-Producer");
pThread.start();
}
//创建消费者xianc
final Runnable custorm=new Customer(queue);
for(int i=0;i<MAX_CUSTOMER;i++) {
Thread cThread = new Thread(custorm, "Thread-Custorm");
cThread.start();
}
}
}