一、阻塞队列实现生产者消费者模式
package com.lock.producerandconsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
//阻塞队列实现生产者消费者模式
public class BlockingQueuePattern {
public static void main(String[] args) {
//阻塞队列
BlockingQueue sharedQueue = new LinkedBlockingQueue();
//创建生产者线程和消费者线程
Thread prodThread = new Thread(new Producer(sharedQueue));
Thread consThread = new Thread(new Consumer(sharedQueue));
//启动生产消费者
prodThread.start();
consThread.start();
}
}
//生产者
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
public Producer(BlockingQueue shareQueue) {
this.sharedQueue = shareQueue;
}
@Override
public void run() {
for(int i=0; i<10; i++) {
//产生10以内的随机整数放入阻塞队列
Random random = new Random();
int ProdRandom = random.nextInt(10);
System.out.println("Produced: " + ProdRandom);
try {
sharedQueue.put(ProdRandom);
} catch (InterruptedException e) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
}
}
}
}
//消费者
class Consumer implements Runnable {
private final BlockingQueue sharedQueue;
public Consumer (BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while(true) {
try {
System.out.println("Consumed: " + sharedQueue.take());
} catch (InterruptedException e) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);;
}
}
}
}
Produced: 7
Produced: 3
Consumed: 7
Produced: 9
Consumed: 3
Produced: 3
Consumed: 9
Produced: 2
Consumed: 3
Produced: 2
Consumed: 2
Produced: 1
Consumed: 2
Produced: 7
Consumed: 1
Produced: 8
Consumed: 7
Consumed: 8
Produced: 4
Consumed: 4
二、 Lock实现生产者消费者模式
package com.lock.producerandconsumer;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//乐观锁的实现
public class OptimisticLockPattern {
public static void main(String[] args) {
SelfQueue selfqueue = new SelfQueue();
//创建生产者线程和消费者线程
Thread prodThread = new Thread(new Producer1(selfqueue));
Thread consThread = new Thread(new Consumer1(selfqueue));
//启动生产者线程和消费者线程
prodThread.start();
consThread.start();
}
}
//自创队列
class SelfQueue {
int max = 5;
LinkedList<Integer> ProdLine = new LinkedList<Integer>();
Lock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition empty = lock.newCondition();
public void produce(int ProdRandom) {
try {
lock.lock();
while(max == ProdLine.size()) {
System.out.println("存储量达到上限,请等待");
full.await();
}
ProdLine.add(ProdRandom);
empty.signal();
}catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public int consume() {
int m = 0;
try{
lock.lock();
while(ProdLine.size() == 0) {
System.out.println("队列为空,请等待");
empty.await();
}
m = ProdLine.removeFirst();
full.signal();
} catch(InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
return m;
}
}
}
//生产者
class Producer1 implements Runnable {
private final SelfQueue selfQueue;
public Producer1(SelfQueue selfQueue) {
this.selfQueue = selfQueue;
}
@Override
public void run() {
for(int i=0; i<10; i++) {
Random random = new Random();
int ProdRandom = random.nextInt(10);
System.out.println("ProdRandom: " + ProdRandom);
selfQueue.produce(ProdRandom);
}
}
}
//消费者
class Consumer1 implements Runnable {
private final SelfQueue selfQueue;
public Consumer1(SelfQueue selfQueue) {
this.selfQueue = selfQueue;
}
@Override
public void run() {
while(true) {
System.out.println("Consumed:" + selfQueue.consume());
}
}
}
ProdRandom: 0
队列为空,请等待
ProdRandom: 1
Consumed:0
Consumed:1
队列为空,请等待
ProdRandom: 4
ProdRandom: 6
ProdRandom: 1
ProdRandom: 7
ProdRandom: 3
ProdRandom: 5
ProdRandom: 3
存储量达到上限,请等待
Consumed:4
Consumed:6
Consumed:1
Consumed:7
Consumed:3
Consumed:5
Consumed:3
队列为空,请等待
ProdRandom: 1
Consumed:1
队列为空,请等待
三、synchronized实现生产者消费者模式
利用
wait()¬ify()
方法。
package com.lock.producerandconsumer;
import java.util.Random;
//悲观锁的实现
public class PessimisticLockPattern {
public static void main(String[] args) {
SelfQueue2 selfqueue = new SelfQueue2();
//创建生产者线程和消费者线程
Thread prodThread = new Thread(new Producer2(selfqueue));
Thread consThread = new Thread(new Consumer2(selfqueue));
//启动生产者线程和消费者线程
prodThread.start();
consThread.start();
}
}
class SelfQueue2{
int index = 0;
int[] ProdLine = new int[6];
public synchronized void produce(int ProdRandom){
while(index == ProdLine.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
ProdLine[index] = ProdRandom;
index++;
}
public synchronized int consume() {
while(index == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
index--;
return ProdLine[index];
}
}
//生产者
class Producer2 implements Runnable {
private final SelfQueue2 selfqueue;
public Producer2(SelfQueue2 selfqueue) {
this.selfqueue = selfqueue;
}
@Override
public void run() {
for(int i=0; i<10; i++) {
Random random = new Random();
int ProdRandom = random.nextInt(10);
System.out.println("Produced: " +ProdRandom);
selfqueue.produce(ProdRandom);
}
}
}
//消费者
class Consumer2 implements Runnable {
private final SelfQueue2 selfqueue;
public Consumer2(SelfQueue2 selfqueue) {
this.selfqueue = selfqueue;
}
@Override
public void run() {
while(true) {
System.out.println("Consumed: " + selfqueue.consume());
}
}
}
Produced: 5
Produced: 8
Consumed: 5
Consumed: 8
Produced: 7
Consumed: 7
Produced: 3
Consumed: 3
Produced: 5
Produced: 1
Consumed: 5
Consumed: 1
Produced: 2
Produced: 4
Produced: 4
Produced: 8
Consumed: 8
Consumed: 4
Consumed: 4
Consumed: 2
转载:https://blog.csdn.net/antony9118/article/details/51500278