1.使用阻塞队列实现
// Producer Class in java
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
public Producer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Produced: " + i);
sharedQueue.put(i);//无需考虑阻塞问题
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
// Consumer Class in Java
class Consumer implements Runnable {
private final BlockingQueue sharedQueue;
public Consumer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
public void run() {
while (true) {
try {
int i = (Integer) sharedQueue.take();//无需考虑阻塞问题
System.out.println("Consumed: " + i);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
}
public class ProducerConsumerPattern {
public static void main(String args[]) {
// Creating shared object
BlockingQueue sharedQueue = new LinkedBlockingQueue();
// Creating Producer and Consumer Thread
Thread prodThread = new Thread(new Producer(sharedQueue));
Thread consThread = new Thread(new Consumer(sharedQueue));
prodThread.start();
consThread.start();
}
}
2.使用 Object 的 wait()和 notify()实现
PriorityQueue queue = new PriorityQueue(10);
class Consumer extends Thread {
public void run() {
while (true) {
synchronized (queue) {//
while (queue.size() == 0) {//队列空的条件下阻塞
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.poll(); // 每次移走队首元素
queue.notify();//唤醒阻塞的生产者线程
}
}
}
}
class Producer extends Thread {
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == 10) {//队列满了的条件下阻塞
try {
queue.wait();//被阻塞将自动释放锁
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(1); // 每次插入一个元素
queue.notify();////唤醒阻塞的消费者线程
}
}
}
}
3.使用Condition实现
private PriorityQueue queue = newPriorityQueue(10);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
class Consumer extends Thread {
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll(); // 每次移走队首元素
notFull.signal();//唤醒生产者线程 与synchronized不同点在于这里通过内置condition 对象
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == 10) {
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1); // 每次插入一个元素
notEmpty.signal();//唤醒消费者者线程 与synchronized不同点在于这里通过内置condition 对象
} finally {
lock.unlock();
}
}
}
}