Java五种实现生产者消费者模型的方式

  • 使用阻塞队列

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

public class ByBlockingQueue {
    public static void main(String[] args) {
        ByBlockingQueue byBlockingQueue = new ByBlockingQueue();
        BlockingDeque<Product> queue = new LinkedBlockingDeque<>();

        ExecutorService executorService = Executors.newCachedThreadPool();

        Producer p1 = new Producer("张三",queue);
        Producer p2 = new Producer("李四",queue);

        Consumer c1 = new Consumer("王五",queue);
        Consumer c2 = new Consumer("赵六",queue);
        Consumer c3 = new Consumer("田七",queue);

        executorService.submit(p1);
        executorService.submit(p2);
        executorService.submit(c1);
        executorService.submit(c2);
        executorService.submit(c3);
    }
}

/*
    产品类
 */
class Product{
    private int id;

    public Product(int id){
        this.id = id;
    }

    public String toString(){
        return "产品"+this.id;
    }
}

/*
    生产者
 */
class Producer implements Runnable{
    private String name;
    private final BlockingDeque<Product> storage;

    public Producer(String name,BlockingDeque<Product> storage) {
        this.name = name;
        this.storage = storage;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Product product = new Product((int) (Math.random()*999));
                System.out.println(name+"准备生产("+product.toString()+")");
                storage.put(product);
                System.out.println(name+"已经生产("+product.toString()+")");
                System.out.println("=========================================");
                Thread.sleep(5000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/*
    消费者
 */
class Consumer implements Runnable{
    private String name;
    private BlockingDeque<Product> storage;

    public Consumer(String name,BlockingDeque storage){
        this.name = name;
        this.storage = storage;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                System.out.println(name+"准备消费产品");
                Product product = storage.take();
                System.out.println(name+"已经消费("+product.toString()+")");
                System.out.println("=========================================");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 使用Object类的wait()方法和notify()方法

import java.io.Serializable;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class ByObjectNotifyWait {
    public static void main(String[] args) {
        BlockingDeque<Order> queue = new LinkedBlockingDeque<>();
        ProducerThread pt = new ProducerThread(queue);
        ConsumerThread ct = new ConsumerThread(queue);

        Thread pth = new Thread(pt);
        pth.setName("生产者线程");

        Thread cth = new Thread(ct);
        pth.setName("消费者线程");

        pth.start();
        cth.start();
    }
}

/*
    生产者
 */
class ProducerThread implements Runnable{
    private BlockingDeque<Order> queue;
    private int number = 0;

    public ProducerThread(BlockingDeque<Order> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        // 模拟像引擎中加入100条订单
        for (int i = 0; i < 10; i ++) {
            synchronized (queue){
                while (queue.size() == 10){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        queue.notify();
                    }
                }
                queue.offer(produceOrder());
                queue.notify();
            }
        }
    }

    private Order produceOrder() {
        Order order = new Order("id_" + number,"name_" + number,new Double(number),number);
        number ++;
        return order;
    }
}

/*
    消费者
 */
class ConsumerThread implements Runnable{
    private BlockingDeque<Order> queue;

    public ConsumerThread(BlockingDeque<Order> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        // 循环处理订单
        while (true) {
            synchronized (queue){
                while (queue.size() == 0){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        queue.notify();
                    }
                }
                Order order = queue.poll();
                if (order != null)
                    disposeOrder(order);
                queue.notify();
            }
        }
    }

    private void disposeOrder(Order order) {
        System.out.println("处理订单信息[商品ID:" + order.getCommodityId() + ", 商品名称:" + order.getGetCommodityName() +
                ", 商品价格:" + order.getPrice() + ", 商品数量:" + order.getQuantity());
    }
}

/*
    订单类
 */
class Order implements Serializable{
    private String commodityId;     //商品ID
    private String getCommodityName;    //商品名称
    private Double price;       //商品价格
    private Integer quantity;   //商品数量
    private static final long serialVersionUID = 4826685511830052034L;

    public Order(String commodityId, String getCommodityName, Double price, Integer quantity) {
        this.commodityId = commodityId;
        this.getCommodityName = getCommodityName;
        this.price = price;
        this.quantity = quantity;
    }

    public String getCommodityId() {
        return commodityId;
    }

    public void setCommodityId(String commodityId) {
        this.commodityId = commodityId;
    }

    public String getGetCommodityName() {
        return getCommodityName;
    }

    public void setGetCommodityName(String getCommodityName) {
        this.getCommodityName = getCommodityName;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }
}
  • 使用Condition中的await()方法和signal()方法

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ByCondition {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition notFull = lock.newCondition();
        Condition noEmpty = lock.newCondition();
        PriorityQueue<Integer> queue = new PriorityQueue<>(10);

        ProducerCondition producerCondition = new ProducerCondition(lock,notFull,noEmpty,queue);
        ConsumerCondition consumerCondition = new ConsumerCondition(lock,notFull,noEmpty,queue);

        Thread producer_thread = new Thread(producerCondition);
        Thread consumer_thread = new Thread(consumerCondition);

        producer_thread.start();
        consumer_thread.start();
    }
}

class ProducerCondition implements Runnable{

    private Lock lock;
    private Condition notFull;
    private Condition notEmpty;
    private PriorityQueue<Integer> queue;

    public ProducerCondition(Lock lock, Condition notFull, Condition notEmpty, PriorityQueue<Integer> queue) {
        this.lock = lock;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
        this.queue = queue;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                while (queue.size() == 10)
                    notFull.await();
                queue.offer(i);
                notEmpty.signal();
                System.out.println("生产者生产了数字"+i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

class ConsumerCondition implements Runnable{
    private Lock lock;
    private Condition notFull;
    private Condition notEmpty;
    private PriorityQueue<Integer> queue;

    public ConsumerCondition(Lock lock, Condition notFull, Condition notEmpty, PriorityQueue<Integer> queue) {
        this.lock = lock;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
        this.queue = queue;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                while (queue.size() == 0)
                    notEmpty.await();
                int integer = queue.poll();
                notFull.signal();
                System.out.println("消费者消费了数字"+integer);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}
  • 使用控制并发线程数的Semaphore类(思想类似操作系统中的PV操作实现生产者与消费者模型)

import java.util.concurrent.Semaphore;

public class BySemaphore {
    int count = 0;
    final Semaphore put = new Semaphore(5);// 初始令牌个数
    final Semaphore get = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1);   //该信号量相当于锁

    public static void main(String[] args) {
        BySemaphore bySemaphore = new BySemaphore();
        new Thread(bySemaphore.new Producer()).start();
        new Thread(bySemaphore.new Consumer()).start();
        new Thread(bySemaphore.new Consumer()).start();
        new Thread(bySemaphore.new Producer()).start();
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    put.acquire();// 注意顺序
                    mutex.acquire();
                    count++;
                    System.out.println("生产者" + Thread.currentThread().getName()
                            + "已生产完成,商品数量:" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    get.release();
                }

            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    get.acquire();// 注意顺序
                    mutex.acquire();
                    count--;
                    System.out.println("消费者" + Thread.currentThread().getName()
                            + "已消费,剩余商品数量:" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    put.release();
                }
            }
        }
    }
}
  • 使用管道PipedInputStream与PipedOutputStream

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class ByPiped {
    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();

    public static void main(String[] args) {
        ByPiped byPiped = new ByPiped();
        new Thread(byPiped.new Producer()).start();
        new Thread(byPiped.new Consumer()).start();
    }

    class Producer implements Runnable {

        @Override
        public void run() {
            try {
                pis.connect(pos);
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                while (true) { // 不断的产生数据
                    int n = (int) (Math.random() * 255);
                    System.out.println("生产者" + Thread.currentThread().getName()
                            + "已生产完成,商品数量:" + n);
                    pos.write(n);
                    pos.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pis.close();
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            int n;
            try {
                while (true) {
                    n = pis.read();
                    System.out.println("消费者" + Thread.currentThread().getName()
                            + "已消费,剩余商品数量:" + n);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pis.close();
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }
}

猜你喜欢

转载自blog.csdn.net/weixin_36378917/article/details/81705452