本模式以一个经典练习为案例:
使用2种锁机制实现生产者和消费者模式
要求
练习(生产者消费者模式):
自定义同步容器,容器容量上限为10。可以在多线程中应用,并保证数据线程安全。
使用synchronized同步及wait()和notifyAll() 实现生产者消费者模式
逻辑图描述:
/**
* 生产者消费者
* wait¬ify
* wait/notify都是和while配合应用的。可以避免多线程并发判断逻辑失效问题。
*/
package concurrent.t04;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
public class TestContainer01<E> {
private final LinkedList<E> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
public synchronized int getCount(){
return count;
}
public synchronized void put(E e){
while(list.size() == MAX){
try {
this.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
list.add(e);
count++;
this.notifyAll();
}
public synchronized E get(){
E e = null;
while(list.size() == 0){
try{
this.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e = list.removeFirst();
count--;
this.notifyAll();
return e;
}
public static void main(String[] args) {
final TestContainer01<String> c = new TestContainer01<>();
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 5; j++){
System.out.println(c.get());
}
}
}, "consumer"+i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for(int i = 0; i < 2; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 25; j++){
c.put("container value " + j);
}
}
}, "producer"+i).start();
}
}
}
使用可重入锁 ReentrantLock 实现生产者消费者模式
逻辑同上:
/**
* 生产者消费者
* 重入锁&条件
* 条件 - Condition, 为Lock增加条件。当条件满足时,做什么事情,如加锁或解锁。如等待或唤醒
*/
package concurrent.t04;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestContainer02<E> {
private final LinkedList<E> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public int getCount(){
return count;
}
public void put(E e){
lock.lock();
try {
while(list.size() == MAX){
System.out.println(Thread.currentThread().getName() + " 等待。。。");
// 进入等待队列。释放锁标记。
// 借助条件,进入的等待队列。
producer.await();
}
System.out.println(Thread.currentThread().getName() + " put 。。。");
list.add(e);
count++;
// 借助条件,唤醒所有的消费者。
consumer.signalAll();
} catch (InterruptedException e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}
}
public E get(){
E e = null;
lock.lock();
try {
while(list.size() == 0){
System.out.println(Thread.currentThread().getName() + " 等待。。。");
// 借助条件,消费者进入等待队列
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " get 。。。");
e = list.removeFirst();
count--;
// 借助条件,唤醒所有的生产者
producer.signalAll();
} catch (InterruptedException e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}
return e;
}
public static void main(String[] args) {
final TestContainer02<String> c = new TestContainer02<>();
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 5; j++){
System.out.println(c.get());
}
}
}, "consumer"+i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
for(int i = 0; i < 2; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 25; j++){
c.put("container value " + j);
}
}
}, "producer"+i).start();
}
}
}
以上消费者生产者逻辑本人理解,有不对的地方,希望指出!