import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ByBlockingQueue {
public static void main(String[] args) {
ByBlockingQueue byBlockingQueue = new ByBlockingQueue();
BlockingDeque<Product> queue = new LinkedBlockingDeque<>();
ExecutorService executorService = Executors.newCachedThreadPool();
Producer p1 = new Producer("张三",queue);
Producer p2 = new Producer("李四",queue);
Consumer c1 = new Consumer("王五",queue);
Consumer c2 = new Consumer("赵六",queue);
Consumer c3 = new Consumer("田七",queue);
executorService.submit(p1);
executorService.submit(p2);
executorService.submit(c1);
executorService.submit(c2);
executorService.submit(c3);
}
}
/*
产品类
*/
class Product{
private int id;
public Product(int id){
this.id = id;
}
public String toString(){
return "产品"+this.id;
}
}
/*
生产者
*/
class Producer implements Runnable{
private String name;
private final BlockingDeque<Product> storage;
public Producer(String name,BlockingDeque<Product> storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Product product = new Product((int) (Math.random()*999));
System.out.println(name+"准备生产("+product.toString()+")");
storage.put(product);
System.out.println(name+"已经生产("+product.toString()+")");
System.out.println("=========================================");
Thread.sleep(5000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
消费者
*/
class Consumer implements Runnable{
private String name;
private BlockingDeque<Product> storage;
public Consumer(String name,BlockingDeque storage){
this.name = name;
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
System.out.println(name+"准备消费产品");
Product product = storage.take();
System.out.println(name+"已经消费("+product.toString()+")");
System.out.println("=========================================");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
-
使用Object类的wait()方法和notify()方法
import java.io.Serializable;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class ByObjectNotifyWait {
public static void main(String[] args) {
BlockingDeque<Order> queue = new LinkedBlockingDeque<>();
ProducerThread pt = new ProducerThread(queue);
ConsumerThread ct = new ConsumerThread(queue);
Thread pth = new Thread(pt);
pth.setName("生产者线程");
Thread cth = new Thread(ct);
pth.setName("消费者线程");
pth.start();
cth.start();
}
}
/*
生产者
*/
class ProducerThread implements Runnable{
private BlockingDeque<Order> queue;
private int number = 0;
public ProducerThread(BlockingDeque<Order> queue) {
this.queue = queue;
}
@Override
public void run() {
// 模拟像引擎中加入100条订单
for (int i = 0; i < 10; i ++) {
synchronized (queue){
while (queue.size() == 10){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(produceOrder());
queue.notify();
}
}
}
private Order produceOrder() {
Order order = new Order("id_" + number,"name_" + number,new Double(number),number);
number ++;
return order;
}
}
/*
消费者
*/
class ConsumerThread implements Runnable{
private BlockingDeque<Order> queue;
public ConsumerThread(BlockingDeque<Order> queue) {
this.queue = queue;
}
@Override
public void run() {
// 循环处理订单
while (true) {
synchronized (queue){
while (queue.size() == 0){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
Order order = queue.poll();
if (order != null)
disposeOrder(order);
queue.notify();
}
}
}
private void disposeOrder(Order order) {
System.out.println("处理订单信息[商品ID:" + order.getCommodityId() + ", 商品名称:" + order.getGetCommodityName() +
", 商品价格:" + order.getPrice() + ", 商品数量:" + order.getQuantity());
}
}
/*
订单类
*/
class Order implements Serializable{
private String commodityId; //商品ID
private String getCommodityName; //商品名称
private Double price; //商品价格
private Integer quantity; //商品数量
private static final long serialVersionUID = 4826685511830052034L;
public Order(String commodityId, String getCommodityName, Double price, Integer quantity) {
this.commodityId = commodityId;
this.getCommodityName = getCommodityName;
this.price = price;
this.quantity = quantity;
}
public String getCommodityId() {
return commodityId;
}
public void setCommodityId(String commodityId) {
this.commodityId = commodityId;
}
public String getGetCommodityName() {
return getCommodityName;
}
public void setGetCommodityName(String getCommodityName) {
this.getCommodityName = getCommodityName;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
}
-
使用Condition中的await()方法和signal()方法
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ByCondition {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition noEmpty = lock.newCondition();
PriorityQueue<Integer> queue = new PriorityQueue<>(10);
ProducerCondition producerCondition = new ProducerCondition(lock,notFull,noEmpty,queue);
ConsumerCondition consumerCondition = new ConsumerCondition(lock,notFull,noEmpty,queue);
Thread producer_thread = new Thread(producerCondition);
Thread consumer_thread = new Thread(consumerCondition);
producer_thread.start();
consumer_thread.start();
}
}
class ProducerCondition implements Runnable{
private Lock lock;
private Condition notFull;
private Condition notEmpty;
private PriorityQueue<Integer> queue;
public ProducerCondition(Lock lock, Condition notFull, Condition notEmpty, PriorityQueue<Integer> queue) {
this.lock = lock;
this.notFull = notFull;
this.notEmpty = notEmpty;
this.queue = queue;
}
@Override
public void run() {
lock.lock();
try {
for (int i = 0; i < 20; i++) {
while (queue.size() == 10)
notFull.await();
queue.offer(i);
notEmpty.signal();
System.out.println("生产者生产了数字"+i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
class ConsumerCondition implements Runnable{
private Lock lock;
private Condition notFull;
private Condition notEmpty;
private PriorityQueue<Integer> queue;
public ConsumerCondition(Lock lock, Condition notFull, Condition notEmpty, PriorityQueue<Integer> queue) {
this.lock = lock;
this.notFull = notFull;
this.notEmpty = notEmpty;
this.queue = queue;
}
@Override
public void run() {
lock.lock();
try {
for (int i = 0; i < 20; i++) {
while (queue.size() == 0)
notEmpty.await();
int integer = queue.poll();
notFull.signal();
System.out.println("消费者消费了数字"+integer);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
-
使用控制并发线程数的Semaphore类(思想类似操作系统中的PV操作实现生产者与消费者模型)
import java.util.concurrent.Semaphore;
public class BySemaphore {
int count = 0;
final Semaphore put = new Semaphore(5);// 初始令牌个数
final Semaphore get = new Semaphore(0);
final Semaphore mutex = new Semaphore(1); //该信号量相当于锁
public static void main(String[] args) {
BySemaphore bySemaphore = new BySemaphore();
new Thread(bySemaphore.new Producer()).start();
new Thread(bySemaphore.new Consumer()).start();
new Thread(bySemaphore.new Consumer()).start();
new Thread(bySemaphore.new Producer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
put.acquire();// 注意顺序
mutex.acquire();
count++;
System.out.println("生产者" + Thread.currentThread().getName()
+ "已生产完成,商品数量:" + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
get.release();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
get.acquire();// 注意顺序
mutex.acquire();
count--;
System.out.println("消费者" + Thread.currentThread().getName()
+ "已消费,剩余商品数量:" + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
put.release();
}
}
}
}
}
-
使用管道PipedInputStream与PipedOutputStream
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class ByPiped {
final PipedInputStream pis = new PipedInputStream();
final PipedOutputStream pos = new PipedOutputStream();
public static void main(String[] args) {
ByPiped byPiped = new ByPiped();
new Thread(byPiped.new Producer()).start();
new Thread(byPiped.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
try {
while (true) { // 不断的产生数据
int n = (int) (Math.random() * 255);
System.out.println("生产者" + Thread.currentThread().getName()
+ "已生产完成,商品数量:" + n);
pos.write(n);
pos.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
int n;
try {
while (true) {
n = pis.read();
System.out.println("消费者" + Thread.currentThread().getName()
+ "已消费,剩余商品数量:" + n);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}