什么是生产者和消费者?
就是你去吃包子,包子铺就是生产者,你就是消费者。简单的说就是生产者负责造,消费者负责耗。
需要解决的问题
- 生产者过度生产问题
- 通知机制
- 消费者过度消费
- 避免产生死锁
进化图
传统写法
synchronized
/*
* 生产者消费者【传统版】
* synchronized、wait、notify
* 一个初始值0,两个线程交替操作五轮,一个加一,一个减一
* 思想:
* 1.线程操作资源类
* 2.判断->干活->通知
*/
class Data{
private int num = 0;
public synchronized void increment(){
//判断
while(num!=0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
num++;
System.out.println(Thread.currentThread().getName()+" invoked increment 操作,num="+num);
//通知
this.notifyAll();
}
public synchronized void decrement(){
//判断
while(num==0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
num--;
System.out.println(Thread.currentThread().getName()+" invoked decrement 操作,num="+num);
//通知
this.notifyAll();
}
}
public class ProductConsumer_1 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 1; i <= 5; i++) {
data.increment();
}
},"线程1").start();
new Thread(()->{
for (int i = 1; i <= 5; i++) {
data.decrement();
}
},"线程2").start();
}
}
运行结果
Lock
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* 生产者消费者【传统版】
* lock、await、signal
* 一个初始值0,两个线程交替操作五轮,一个加一,一个减一
* 思想:
* 1.线程操作资源类
* 2.判断->干活->通知
*/
class Data{
private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try {
//判断
while(num!=0) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
num++;
System.out.println(Thread.currentThread().getName()+" invoked increment 操作,num="+num);
//通知
condition.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
}
}
public void decrement(){
lock.lock();
try {
//判断
while(num==0) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
num--;
System.out.println(Thread.currentThread().getName()+" invoked decrement 操作,num="+num);
//通知
condition.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}
public class ProductConsumer_1 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 1; i <= 5; i++) {
data.increment();
}
},"线程1").start();
new Thread(()->{
for (int i = 1; i <= 5; i++) {
data.decrement();
}
},"线程2").start();
}
}
运行结果
synchronized 和 Lock 有什么区别?
1. 原始构成
synchronized 是关键字,属于 JVM 层面。
monitorenter / monitorexit
底层是通过monitor对象来完成,其实wait/notify等方法也依赖monitor对象,只有在同步块或方法中才能调用wait/notify等方法
Lock 是具体的类(java.util.concurrent.locks.Lock),是 api 层面的锁
2. 使用方法
synchronized 不需要用户去手动释放锁,当synchronized代码执行完后熊会自动让线程释放对锁的占用
ReentrantLock则需要用户去手动释放锁,若没有主动释放锁,就有可能导致出现死锁现象。
需要lock()和unlock()方法配合try/finally语句块来完成
3. 等待是否可中断
synchronized 不可中断,除非抛异常或者正常运行完成
ReentrantLock 可中断
- 设置超时方法 tryLock(long timeout, TimeUnit unit)
- lockInterruptibly()放代码块中,调用interrupt()方法可中断
4. 加锁是否公平
synchronized 非公平锁
ReentrantLock两者都可以,默认是非公平锁,构造方法可以传入boolean值,true为公平锁,false为非公平锁
5. 锁要绑定多个条件 Condition
synchronized 没有
ReentrantLock 用来分组唤醒的线程,可以精确唤醒,而不像 synchronized 要么随机唤醒一个,要么唤醒全部线程。
Lock 的优点
多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下:
AA打印 5 次,BB打印 10 次,CC打印 15 次
AA打印 5 次,BB打印 10 次,CC打印 15 次
.......
交替进行10轮
如果使用 synchronized,不是不可以做,做起来比较麻烦。使用 Lock 就比较简单了。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class LockData{
private int num = 1; //A:1,B:2,C:3
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print5() {
lock.lock();
try {
//判断
while(num!=1) {
c1.await();
}
//干活
for(int i=0;i<5;i++) {
System.out.println(Thread.currentThread().getName()+" "+(i+1));
}
//通知
num = 2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
//判断
while(num!=2) {
c2.await();
}
//干活
for(int i=0;i<10;i++) {
System.out.println(Thread.currentThread().getName()+" "+(i+1));
}
//通知
num = 3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
//判断
while(num!=3) {
c3.await();
}
//干活
for(int i=0;i<15;i++) {
System.out.println(Thread.currentThread().getName()+" "+(i+1));
}
//通知
num = 1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
public class LockDemo {
public static void main(String[] args) {
LockData lockData = new LockData();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
lockData.print5();
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
lockData.print10();
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
lockData.print15();
}
},"CC").start();
}
}
运行结果
进阶写法(高并发、消息中间件)[重点]
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/*
* 生产者消费者【高并发版】
* 使用阻塞队列实现
* 生产和消费过程自动化进行,不需要进行干预
* 消息中间件底层原理
*/
class SourceQueue{
private volatile boolean flag = true; //默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
// 构造注入,传入接口实现类,可以适配7种阻塞队列
public SourceQueue(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 利用反射获取传入类
System.out.println("传入阻塞队列\n"+blockingQueue.getClass().getName()+"\n");
}
// 生产线程
public void pord()throws Exception{
String data = null;// 数据
boolean offer;
while( flag ) {
data = atomicInteger.incrementAndGet()+"";
offer = blockingQueue.offer(data,2L,TimeUnit.SECONDS);
if(offer) {
System.out.println(Thread.currentThread().getName()+" 插入队列,data "+data+" 成功");
}else {
System.out.println(Thread.currentThread().getName()+" 插入队列,data "+data+" 失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+" 停止生产 ,flag="+flag+"生产动作结束");
}
// 消费者
public void consumer()throws Exception{
String result = null;
while( flag ) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(result == null || result.equalsIgnoreCase("")){
flag = false;
System.out.println(Thread.currentThread().getName()+" 超过2s没有取到,消费退出");
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName()+" 消费队列 ,result="+result);
}
}
public void stop() throws Exception{
this.flag = false;
}
}
public class ProductConsumer_2 {
public static void main(String[] args) throws Exception {
SourceQueue sourceQueue = new SourceQueue(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 生产者线程启动");
try {
sourceQueue.pord();
} catch (Exception e) {
e.printStackTrace();
}
},"Prod").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 消费者线程启动");
try {
sourceQueue.consumer();
} catch (Exception e) {
e.printStackTrace();
}
},"Consumer").start();
TimeUnit.SECONDS.sleep(5);
System.out.println();
System.out.println(Thread.currentThread().getName()+" BOSS 停止");
System.out.println();
sourceQueue.stop();
}
}
运行结果