生产者-消费者模式是一个常用的结构,也是多线程的一个典型应用场景。在这个模式下,生产者生产产品放入仓库,消费者从仓库中取出产品消费,为了确保正确,仓库必须进行同步操作。因此,要用到一些阻塞方法。消费者和生产者在进行各自对仓库的操作时,都需要获取仓库的锁,否则会被阻塞;反过来,对仓库的操作完成之后,要释放仓库的锁,通知被阻塞的对象。
另外值得注意的地方是,这里使用while(condition)+ wait的方式实现了自旋锁。
第一种方式:
使用Object类的wait和notifyAll方法。在这个方法中,消费者获知库存为0,则调用wait方法阻塞等待;生产者获知库存达到最大值,也调用wait方法阻塞等待。在对仓库的操作结束之后,库存必定适合被对方进行下一步操作,因此调用notifyAll方法,通知被阻塞的对象。但这里有一个问题是,原本事情的逻辑是i消费者操作完成应通知生产者,而生产者完成后应通知消费者,但wait-notifyAll做不到这个级别的粒度控制,只能通知所有被阻塞的对象,不管是i消费者还是生产者,这样就有可能造成不必要的阻塞队列的进出。当然这就可以用Lock-Condition的组合解决。
public class Producer extends Thread { private Queue<Integer> queue; private Random random = new Random(); private int max; public Producer(Queue<Integer> queue, int max){ this.queue = queue; this.max = max; } public void produce(){ while (true){ synchronized (queue){ /*** * while (condition){ * Object.wait(); * } * --- self-rotation lock */ while (queue.size() >= max){ try { System.out.println("The queue is full now!"); queue.wait(); }catch (InterruptedException e){ e.printStackTrace(); } } queue.add(queue.size()); System.out.println(Thread.currentThread() + " successfully add one integer!"); queue.notifyAll(); try { Thread.sleep(random.nextInt(10)); }catch (InterruptedException e){ e.printStackTrace(); } } } } @Override public void run() { super.run(); produce(); } }
public class Consumer extends Thread{
private Queue<Integer> queue;
private Random random = new Random();
public Consumer(Queue<Integer> queue){
this.queue = queue;
}
public void consume(){
while (true){
synchronized (queue){
while (queue.isEmpty()){
try {
System.out.println("The queue is empty now!");
queue.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
queue.poll();
System.out.println(Thread.currentThread() + " remove one from the inventory.");
queue.notifyAll();
try {
Thread.sleep(random.nextInt(10));
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
@Override
public void run() {
super.run();
consume();
}
}
public class Demo {
public static void main(String[] args) {
Queue<Integer> queue = new LinkedList<>();
Consumer consumer = new Consumer(queue), consumer1 = new Consumer(queue),
consumer2 = new Consumer(queue);
Producer producer = new Producer(queue, 10), producer1 = new Producer(queue, 10),
producer2 = new Producer(queue, 10), producer3 = new Producer(queue, 10);
consumer.start();
consumer1.start();
consumer2.start();
producer.start();
producer1.start();
producer2.start();
producer3.start();
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.exit(0);
}
}
第二种方式:
使用Lock-Condition 的组合,这种方式可以将通知解阻塞的条件分成两种——full/empty。这样可以实现更小粒度的阻塞控制,避免不必要的操作。
public class ConsumerAndProducerLock { private static final ReentrantLock Lock = new ReentrantLock(); private static final Condition fullCondition = Lock.newCondition(); private static final Condition emptyCondition = Lock.newCondition(); public static void main(String[] args) { Queue<Integer> queue = new LinkedList<>(); queue.add(0); new Consumer(queue, "C1").start(); new Producer(queue, "P1", 10).start(); new Consumer(queue, "C2").start(); new Producer(queue, "p2", 10).start(); new Consumer(queue, "C3").start(); new Producer(queue, "P3", 10).start(); new Consumer(queue, "C4").start(); new Producer(queue, "p4", 10).start(); try { Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } System.exit(0); } private static class Consumer extends Thread{ private Queue<Integer> queue; private String name; public Consumer(Queue<Integer> queue, String name){ this.queue = queue; this.name = name; } public void consume(){ while (true){ Lock.lock(); while (queue.isEmpty()){ try { System.out.println(name + " found the queue is empty"); emptyCondition.await(); }catch (InterruptedException e){ e.printStackTrace(); } } System.out.println("Consumer " + name + " consumes " + queue.poll()); fullCondition.signalAll(); Lock.unlock(); } } @Override public void run() { super.run(); consume(); } } private static class Producer extends Thread{ private Queue<Integer> queue; private String name; private int max; public Producer(Queue<Integer> queue, String name, int max){ this.queue = queue; this.name = name; this.max =max; } public void produce(){ while (true){ Lock.lock(); while (queue.size() >= max){ try { System.out.println(name + " found the queue is full"); fullCondition.await(); }catch (InterruptedException e){ e.printStackTrace(); } } System.out.println("Producer " + name + " produces " + queue.size()); queue.add(queue.size() + 1); emptyCondition.signalAll(); Lock.unlock(); } } @Override public void run() { super.run(); produce(); } } }