文章目录
一:什么是生产者和消费者?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
二:生产者消费者模式的工作机制
1、通过容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,解耦了生产者和消费者。
2、体现了面向对象的设计理念:低耦合
这就相当于去包子店吃包子,你要5个包子,老板把5个人包子放在一个盘子中再给你,这个盘子就是一个缓冲区。
3、生产者消费者模式的核心是“阻塞队列”也称消息队列。
三:基于队列实现生产者与消费者模式
模拟一个简单的厨师做包子,顾客吃包子的例子(该例子是典型的生产者与消费者模式)
3.1 代码示例
from multiprocessing import Process,Queue
import time, random
# 生产者
def producer(name, food, q):
for i in range(1, 5):
time.sleep(2) # 厨师每2秒做一个包子
print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
q.put(res) # 将制作的包子装在队列里面
def consumer(name,q):
while True:
res = q.get()
time.sleep(random.randint(1, 2)) # 模拟吃包子时间
print('消费者【%s】:吃 %s\n' % (name, res))
if __name__ == '__main__':
q = Queue()
# 生产者们:即厨师们
p = Process(target=producer, args=('金鞍少年', '包子', q))
#消费者们:即吃货们
c = Process(target=consumer, args=('岳云鹏', q))
# 开始
p.start()
c.start()
print('主') # 主进程
提出问题:
此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
解决问题:
办法一:解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环
3.2 比较LOW的模式实例
from multiprocessing import Process,Queue
import time, random
# 生产者
def producer(name, food, q):
for i in range(1, 5):
time.sleep(2) # 厨师每2秒做一个包子
print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
q.put(res) # 将制作的包子装在队列里面
def consumer(name,q):
while True:
res = q.get()
if res is None: break # 收到结束信号则结束
time.sleep(random.randint(1, 2)) # 模拟吃包子时间
print('消费者【%s】:吃 %s\n' % (name, res))
if __name__ == '__main__':
q = Queue()
# 生产者们:即厨师们
p1 = Process(target=producer, args=('金鞍少年', '包子', q))
#消费者们:即吃货们
c1 = Process(target=consumer, args=('岳云鹏', q))
# 开始
p1.start()
c1.start()
p1.join()
q.put(None) # 发送结束信号 #有几个消费者就应该发送几次结束信号None
print('主') # 主进程
出现新的问题:
如果有多个消费者的时候,就需要发送多次结束信号,这样容易导致代码臃肿,不美观。
解决问题办法:
其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制,
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
3.3 JoinableQueue与守护进程的示例
'''
生产者和消费者模式
'''
import time ,random
from multiprocessing import Process, JoinableQueue
# 生产者
def producer(name,food,q):
for i in range(1, 5):
time.sleep(2) # 厨师每2秒做一个包子
print('厨师【%s】:制作了 %s 个%s' % (name, i, food))
res = '厨师【%s】制作的第【%s】%s' % (name, i, food)
q.put(res) # 将制作的包子装在队列里面
# 消费者
def consumer(name, q):
while True:
res = q.get()
time.sleep(random.randint(1, 2)) # 模拟吃包子时间
print('消费者【%s】:吃 %s\n' % (name, res))
q.task_done()
if __name__ == "__main__":
q = JoinableQueue() # 创建一个队列
# 生产者们:即厨师们
p1 = Process(target=producer, args=('金鞍少年','包子', q))
p2 = Process(target=producer, args=('nancy','牛肉面', q))
# 消费者们:即吃货们
c1 = Process(target=consumer, args=('岳云鹏', q,))
c2 = Process(target=consumer, args=('郭德纲', q,))
# 设置守护进程
c1.daemon = True
c2.daemon = True
# 开始
p_l = [p1, p2, c1, c2]
for p in p_l:
p.start()
p1.join()
p2.join() # 保障生产者已经结束生产
print('主')
3.4 总结:
使用joinablequeue实现队列
- 消费者不需要判断从队列里拿到None,再退出执行消费者函数了
- 消费者每次从队列里面q.get()一个数据,处理过后就使用队列.task_done()
- 生产者for循环生产完所有产品,需要q.join()阻塞一下,对这个队列进行阻塞。
- 启动一个生产者,启动一个消费者,并且这个消费者做成守护进程,然后生产者需要p.join()阻塞一下。
- 我启动了生产者之后,生产者函数一直在生成数据,直到生产完所有数据将队列q.join()一下,意思是当我生产的数据都被消费者消费完之后,队列的阻塞才结束。
- 结束过程:消费者这边是每消费完一个数据给队列返回一个q.task_done(),直到所有的数据都被消费完之后,生产者函数这边的队列.阻塞结束了,队列阻塞结束了生产者函数执行结束了。生产者函数结束了,那么p.join()生产者进程对象就结束了。生产者进程对象结束了整个主进程的代码就执行结束了。主进程代码结束了守护进程及消费者进程也结束了