【python内功修炼003】:并发编程之生产者与消费者模式

一:什么是生产者和消费者?

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

二:生产者消费者模式的工作机制

1、通过容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,解耦了生产者和消费者。
在这里插入图片描述

2、体现了面向对象的设计理念:低耦合

这就相当于去包子店吃包子,你要5个包子,老板把5个人包子放在一个盘子中再给你,这个盘子就是一个缓冲区。

3、生产者消费者模式的核心是“阻塞队列”也称消息队列。

在这里插入图片描述

三:基于队列实现生产者与消费者模式

模拟一个简单的厨师做包子,顾客吃包子的例子(该例子是典型的生产者与消费者模式)

3.1 代码示例

from multiprocessing import Process,Queue
import time, random


# 生产者
def producer(name, food, q):
    for i in range(1, 5):
        time.sleep(2)  # 厨师每2秒做一个包子
        print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
        res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
        q.put(res)  # 将制作的包子装在队列里面


def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 2))  # 模拟吃包子时间
        print('消费者【%s】:吃 %s\n' % (name, res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p = Process(target=producer, args=('金鞍少年', '包子', q))
    #消费者们:即吃货们
    c = Process(target=consumer, args=('岳云鹏', q))
    # 开始
    p.start()
    c.start()

    print('主')  # 主进程

提出问题:

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决问题:

办法一:解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

3.2 比较LOW的模式实例

from multiprocessing import Process,Queue
import time, random


# 生产者
def producer(name, food, q):
    for i in range(1, 5):
        time.sleep(2)  # 厨师每2秒做一个包子
        print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
        res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
        q.put(res)  # 将制作的包子装在队列里面


def consumer(name,q):
    while True:
        res = q.get()
        if res is None: break  # 收到结束信号则结束
        time.sleep(random.randint(1, 2))  # 模拟吃包子时间
        print('消费者【%s】:吃 %s\n' % (name, res))


if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('金鞍少年', '包子', q))

    #消费者们:即吃货们
    c1 = Process(target=consumer, args=('岳云鹏', q))

    # 开始
    p1.start()
    c1.start()

    p1.join()
    q.put(None)  # 发送结束信号 #有几个消费者就应该发送几次结束信号None
    print('主')  # 主进程

出现新的问题:

如果有多个消费者的时候,就需要发送多次结束信号,这样容易导致代码臃肿,不美观。

解决问题办法:
其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制,

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

3.3 JoinableQueue与守护进程的示例

'''
生产者和消费者模式
'''
import time ,random
from multiprocessing import Process, JoinableQueue

# 生产者
def producer(name,food,q):
    for i in range(1, 5):
        time.sleep(2)  # 厨师每2秒做一个包子
        print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
        res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
        q.put(res)  # 将制作的包子装在队列里面


# 消费者
def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 2))  # 模拟吃包子时间
        print('消费者【%s】:吃 %s\n' % (name, res))
        q.task_done() 



if __name__ == "__main__":
    q = JoinableQueue()  # 创建一个队列
    
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('金鞍少年','包子', q))
    p2 = Process(target=producer, args=('nancy','牛肉面', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=('岳云鹏', q,))
    c2 = Process(target=consumer, args=('郭德纲', q,))
    # 设置守护进程
    c1.daemon = True
    c2.daemon = True

    # 开始
    p_l = [p1, p2, c1, c2]
    for p in p_l:
        p.start()

    p1.join() 
    p2.join() # 保障生产者已经结束生产
    print('主')

3.4 总结:

使用joinablequeue实现队列

  1. 消费者不需要判断从队列里拿到None,再退出执行消费者函数了
  2. 消费者每次从队列里面q.get()一个数据,处理过后就使用队列.task_done()
  3. 生产者for循环生产完所有产品,需要q.join()阻塞一下,对这个队列进行阻塞。
  4. 启动一个生产者,启动一个消费者,并且这个消费者做成守护进程,然后生产者需要p.join()阻塞一下。
  5. 我启动了生产者之后,生产者函数一直在生成数据,直到生产完所有数据将队列q.join()一下,意思是当我生产的数据都被消费者消费完之后,队列的阻塞才结束。
  6. 结束过程:消费者这边是每消费完一个数据给队列返回一个q.task_done(),直到所有的数据都被消费完之后,生产者函数这边的队列.阻塞结束了,队列阻塞结束了生产者函数执行结束了。生产者函数结束了,那么p.join()生产者进程对象就结束了。生产者进程对象结束了整个主进程的代码就执行结束了。主进程代码结束了守护进程及消费者进程也结束了
发布了72 篇原创文章 · 获赞 79 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/weixin_42444693/article/details/104966395