生产者消费者问题的几种实现

      生产者消费者问题是一个经典的问题,一般情况下都会使用synchronized关键字来对生产和消费逻辑进行加锁 ,最近学习了下并发编程相关的基础知识,尝试使用其它的几种方法来实现生产者和消费者模型。

1. synchronized实现生产者消费者模型synchronized、wait和notify 使用synchronized来实现生产者消费者模式中规中矩

public class SynchronizedDemo {

    public static void main(String[] args) {
        Queue<Goods> queue = new LinkedList<Goods>();
        int count = 10;    // 最多能生产的商品数量  队列的大小
        int consumeNum = 2;
        int produceNum = 5;
        Storage storage = new Storage(queue, count, 0);
        Consumer c1 = new Consumer(consumeNum, storage);
        Consumer c2 = new Consumer(consumeNum, storage);
        Producer p1 = new Producer(produceNum, storage);
        Producer p2 = new Producer(produceNum, storage);
        Producer p3 = new Producer(produceNum, storage);
        Producer p4 = new Producer(produceNum, storage);
        new Thread(c1).start();
        new Thread(c2).start();
        new Thread(p1).start();
        new Thread(p2).start();
        new Thread(p3).start();
        new Thread(p4).start();
    }
}

class Goods {
    private int index;
    public int getIndex() {
        return index;
    }
    public void setIndex(int index) {
        this.index = index;
    }
    @Override
    public String toString() {
        return "Goods{" +
                "index=" + index +
                '}';
    }
}

class Storage {
    private Queue<Goods> queue;
    private int count;
    private Integer index;
    public Storage(Queue<Goods> queue, int count,  int index) {
        this.queue = queue;
        this.count = count;
        this.index = index;
    }
    public void consume(int num) {
        synchronized (this) {
            while (queue.size() < num) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int i = 0; i < num; i++) {
                Goods goods = queue.remove();
                System.out.println("consume thread=" + Thread.currentThread() + "goods=" + goods + ",list.size=" + queue.size());
            }
            this.notifyAll();
        }
    }
    public void produce(int num) {
        synchronized (this) {
            while (queue.size() + num > count) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int i = 0; i < num; i++) {
                Goods goods = new Goods();
                synchronized (index) {
                    goods.setIndex(++index);
                }
                queue.add(goods);
                System.out.println("produce thread=" + Thread.currentThread() + "newGoods="
                        + goods + ",list.size=" + queue.size());
            }
            this.notifyAll();
        }
    }
}

class Producer implements Runnable {
    private Storage storage;
    private int num;

    public Producer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }
    public void run() {
        while (true) {
            storage.produce(num);
        }
    }
}

class Consumer implements Runnable {
    private Storage storage;
    private int num;

    public Consumer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }
    public void run() {
        while (true) {
            storage.consume(num);
        }
    }
}

2. ReentrantLock实现生产者消费者模型(lock和condition的await、signalAll )

      1、ReentrantLock 拥有Synchronized相同的并发性和内存语义,此外还多了锁投票,定时锁等候和中断锁等候线程A和B都要获取对象O的锁定,假设A获取了对象O锁,B将等待A释放对O的锁定, 如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断如果使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情

ReentrantLock获取锁定与种方式:

  1. lock(),  等待获取锁的过程中休眠,并禁止一切调度
  2. tryLock(),获取到锁返回true获取不到返回flase
  3.  tryLock(long timeout,TimeUnit unit), 在指定时间内获取锁(同tryLock())过程可被中断
  4. lockInterruptibly:等待获取锁的过程可被中断
  1. synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁,但是使用Lock则不行,lock是通过代码实现的,要保证锁定一定会被释放,就必须将unLock()放到finally{}中
  2. 在资源竞争不是很激烈的情况下,Synchronized的性能要优于ReetrantLock,但是在资源竞争很激烈的情况下,Synchronized的性能会下降几十倍,但是ReetrantLock的性能能维持常态;

      注意:使用ReentrantLock必须在 finally 块中释放否则,如果受保护的代码将抛出异常,锁就有可能永远得不到释放!这一点区别看起来可能没什么,但是实际上,它极为重要。忘记在 finally 块中释放锁,可能会在程序中留下一个定时炸弹,当有一天炸弹爆炸时,您要花费很大力气才有找到源头在哪。而使用sychronized,JVM 将确保锁会获得自动释放。

public class ReentrantLockDemo {

    public static void main(String[] args) {
        Queue<Goods> queue = new LinkedList<Goods>();
        int count = 100;  // 最多能生产的商品数量
        int consumeNum = 7;
        int produceNum = 3;

        Storage storage = new Storage(queue, count,  0);
        Consumer c1 = new Consumer(consumeNum, storage);
        Consumer c2 = new Consumer(consumeNum, storage);
        Consumer c3 = new Consumer(consumeNum, storage);
        Consumer c4 = new Consumer(consumeNum, storage);
        Producer p2 = new Producer(produceNum, storage);
        Producer p1 = new Producer(produceNum, storage);
        new Thread(c1).start();
        new Thread(c2).start();
        new Thread(c3).start();
        new Thread(c4).start();
        new Thread(p1).start();
        new Thread(p2).start();
    }

    static class Goods {
        private int index;

        public int getIndex() {
            return index;
        }
        public void setIndex(int index) {
            this.index = index;
        }
        @Override
        public String toString() {
            return "Goods{" +
                    "index=" + index +
                    '}';
        }
    }

    static class Storage{
        ReentrantLock lock = new ReentrantLock();
        /*在 ReentrantLock 对象上 newCondition()可以得到一个 Condition 对象,
        可以通过在 Condition 上调用 await()方法来挂起一个任务(线程),
        通过在 Condition 上调用 signal()来通知任务,从而唤醒一个任务,
        或者调用 signalAll()来唤醒所有在这个 Condition 上被其自身挂起的任务。
        另外,如果使用了公平锁,signalAll()的与 Condition 关联的所有任务将以 FIFO 队列的形式获取锁,
        如果没有使用公平锁,则获取锁的任务是随机的,这样我们便可以更好地控制处在 await 状态的任务获取锁的顺序。
        与 notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定唤醒与自身 Condition 对象绑定在一起的任务。*/
        Condition consumeCondition = lock.newCondition();
        Condition produceCondition = lock.newCondition();
        Queue<Goods> queue;
        private int count;
        private Integer index;
        public Storage(Queue<Goods> queue, int count,  int index) {
            this.queue = queue;
            this.count = count;
            this.index = index;
        }

        public void consume(int num) {
            lock.lock();
            try {
                while (queue.size() < num) {
                    consumeCondition.await();
                }
                for (int i = 0; i < num; i++) {
                    Goods goods = queue.remove();
                    System.out.println("consume thread=" + Thread.currentThread()
                            + "goods=" + goods + ",list.size=" + queue.size());
                }
                //调用 produceCondition.signalAll()
                // 来唤醒所有在produceCondition上被其自身挂起的任务。
                produceCondition.signalAll();
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void produce(int num) {
            lock.lock();
            try {
                while (queue.size() + num > count) {
                    produceCondition.await();
                }
                for (int i = 0; i < num; i++) {
                    Goods goods = new Goods();
                    synchronized (index) {
                        goods.setIndex(++index);
                    }
                    queue.add(goods);
                    System.out.println("produce thread=" + Thread.currentThread() + "newGoods="
                            + goods + ",list.size=" + queue.size());
                }
                //调用 consumeCondition.signalAll()
                // 来唤醒所有在consumeCondition上被其自身挂起的任务。
                consumeCondition.signalAll();
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    static  class Producer implements Runnable {
        private Storage storage;
        private int num;

        public Producer(int num, Storage storage) {
            this.num = num;
            this.storage = storage;
        }

        public void run() {
            while (true) {
                storage.produce(num);
            }
        }
    }
    static class Consumer implements Runnable {
        private Storage storage;
        private int num;

        public Consumer(int num, Storage storage) {
            this.num = num;
            this.storage = storage;
        }

        public void run() {
            while (true) {
                storage.consume(num);
            }
        }
    }
}

3. BlockingQueue实现生产者消费者模型

       实现生产者消费者模式的最简单的方式,不用提供额外的同步,因为BlockingQueue在底层就是线程安全的,它使用ReentrantLock来保证线程安全

public class BlockingQueueDemo {
    public static void main(String[] args) {
        int count = 100;  //最多能生产的商品的数量
        int consumeNum = 3;
        int produceNum = 7;
        BlockingQueue<Goods> blockingQueue = new LinkedBlockingDeque<Goods>(count);
        Storage storage = new Storage(blockingQueue, count,  0);
        Consumer c1 = new Consumer(consumeNum, storage);
        Consumer c2 = new Consumer(consumeNum, storage);
        Producer p2 = new Producer(produceNum, storage);
        Producer p1 = new Producer(produceNum, storage);
        Producer p3 = new Producer(produceNum, storage);
        Producer p4 = new Producer(produceNum, storage);
        new Thread(p1).start();
        new Thread(p2).start();
        new Thread(p3).start();
        new Thread(p4).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

class Goods {
    private int index;

    public int getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }

    @Override
    public String toString() {
        return "Goods{" +
                "index=" + index +
                '}';
    }
}

class Storage {

    BlockingQueue<Goods> blockingQueue;
    private int count;
    private Integer index;

    public Storage(BlockingQueue<Goods> blockingQueue, int count,  int index) {
        this.blockingQueue = blockingQueue;
        this.count = count;
        this.index = index;
    }

    public void consume(int num) {
        for (int i = 0; i < num; i++) {
            Goods goods = null;
            try {
                goods = blockingQueue.take();  //从队列中取出goods
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("consume thread=" + Thread.currentThread() + "goods=" + goods + ",list.size=" + blockingQueue.size());
        }
    }

    public void produce(int num) {
        for (int i = 0; i < num; i++) {
            Goods goods = new Goods();
            synchronized (index) { //调用生产者时要同步
                goods.setIndex(++index);
            }
            try {
                blockingQueue.put(goods);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("produce thread=" + Thread.currentThread() + "newGoods=" + goods + ",list.size=" + blockingQueue.size());
        }
    }
}
class Producer implements Runnable {
    private Storage storage;
    private int num;

    public Producer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }

    public void run() {
        while (true) {
            storage.produce(num);
        }
    }
}
class Consumer implements Runnable {
    private Storage storage;
    private int num;

    public Consumer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }

    public void run() {
        while (true) {
            storage.consume(num);
        }
    }
}

4. Semaphore实现生产者消费者模型

   使用信号量来实现生产者和消费者模式并不是一个很好的注意,代码复杂,线程间同步比较麻烦

public class SemaphoreDemo {
    public static void main(String[] args) {
        Queue<Goods> queue = new LinkedList<Goods>();
        int count = 100;
        int consumeNum = 3;
        int produceNum = 7;
        Storage storage = new Storage(queue, count, 0);
        Consumer c1 = new Consumer(consumeNum, storage);
        Consumer c2 = new Consumer(consumeNum, storage);
//      Consumer c3 = new Consumer(consumeNum, storage);
//      Consumer c4 = new Consumer(consumeNum, storage);
        Producer p2 = new Producer(produceNum, storage);
        Producer p1 = new Producer(produceNum, storage);
        Producer p3 = new Producer(produceNum, storage);
        Producer p4 = new Producer(produceNum, storage);
        new Thread(c1).start();
        new Thread(c2).start();
//      new Thread(c3).start();
//      new Thread(c4).start();
        new Thread(p1).start();
        new Thread(p2).start();
        new Thread(p3).start();
        new Thread(p4).start();
    }

}

class Goods {
    private int index;

    public int getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }

    @Override
    public String toString() {
        return "Goods{" +
                "index=" + index +
                '}';
    }
}

class Storage {
    Semaphore mutaxSemaphore;
    Semaphore consumerSemaphore;
    Semaphore producerSemaphore;
    Queue<Goods> queue;
    private int count;
    private Integer index;

    public Storage(Queue<Goods> queue, int count, int index) {
        this.queue = queue;
        this.count = count;
        this.index = index;
        mutaxSemaphore = new Semaphore(1);  // 用来控制同时生产或消费的只有一个线程
        consumerSemaphore = new Semaphore(0);  //在开始时能消费的商品数量为0个
        producerSemaphore = new Semaphore(count);  //刚开始最多能生产count个商品
    }

    public void consume(int num) {
        try {
            consumerSemaphore.acquire(num);
            mutaxSemaphore.acquire();
            for (int i = 0; i < num; i++) {
                Goods goods = queue.poll();
                if (null == goods) {
                    consumerSemaphore.release(num - i); //消费失败,把还没有消费的占用的信号量release掉
                    synchronized (index) {
                        ++index;
                    }
                } else {
                    System.out.println("consume thread=" + Thread.currentThread() +
                            "goods=" + goods + ",list.size=" + queue.size());
                    producerSemaphore.release(); // 每消费一个商品释放一个信号量
                }
            }
            mutaxSemaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void produce(int num) {
        try {
            producerSemaphore.acquire(num); //每次先阻塞获取num个信号量
            mutaxSemaphore.acquire();
            for (int i = 0; i < num; i++) {
                Goods goods = new Goods();
                synchronized (index) {
                    goods.setIndex(++index);
                }
                boolean isSuccess = queue.add(goods);
                if (!isSuccess) {
                    producerSemaphore.release(num - i);  //生产失败,把还没有生产的所占用的信号量release掉
                    synchronized (index) {
                        --index;
                    }
                } else {
                    System.out.println("produce thread=" + Thread.currentThread() + "newGoods=" + goods + ",list.size=" + queue.size());
                    consumerSemaphore.release(); //没生产一个商品增加一个可消费的信号量
                }
            }
            mutaxSemaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Producer implements Runnable {
    private Storage storage;
    private int num;

    public Producer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }

    public void run() {
        while (true) {
            storage.produce(num);
        }
    }
}

class Consumer implements Runnable {
    private Storage storage;
    private int num;

    public Consumer(int num, Storage storage) {
        this.num = num;
        this.storage = storage;
    }

    public void run() {
        while (true) {
            storage.consume(num);
        }
    }
}

猜你喜欢

转载自blog.csdn.net/laomumu1992/article/details/85264488