一、生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。
二、生产者消费者模型实现
1、基于队列来实现一个生产者消费者模型
1 from multiprocessing import Process,Queue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 9 def producer(q): 10 for i in range(10): 11 time.sleep(random.randint(1,3)) 12 res='包子%s' %i 13 q.put(res) 14 print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 15 16 if __name__ == '__main__': 17 q=Queue() 18 #生产者们:即厨师们 19 p1=Process(target=producer,args=(q,)) 20 21 #消费者们:即吃货们 22 c1=Process(target=consumer,args=(q,)) 23 24 #开始 25 p1.start() 26 c1.start() 27 print('主')
总结:
1 #生产者消费者模型总结 2 #程序中有两类角色 3 一类负责生产数据(生产者) 4 一类负责处理数据(消费者) 5 6 #引入生产者消费者模型为了解决的问题是: 7 平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度 8 9 #如何实现: 10 生产者<-->队列<——>消费者 11 #生产者消费者模型实现类程序的解耦和
问题:
通过上面基于队列的生产者消费者代码示例,我们发现一个问题:主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环
2、主进程去发送一个结束信号
注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号
1 from multiprocessing import Process, Queue 2 import time 3 def shenchan(q): 4 for i in range(10): # 每1秒生产一个包子 5 time.sleep(1) 6 print('生产%s号包子' % i) 7 q.put(i) # 生产一个放入队列(缓冲区) 8 9 10 def xiaofei(q): 11 while 1: 12 time.sleep(0.5) # 一直从队列中拿包子,每0.5秒那一次 13 if q.get() == None: # 当从队列中拿时,拿到了None,就不拿了退出,None信号由主进程发出 14 break 15 else: 16 print('消费者吃%s号包子' % q.get()) # 消费者拿的比生产者产包子快,则队列中每包子,还get,等待消费者向队列中放包子再取 17 18 if __name__ == '__main__': 19 20 q = Queue(10) 21 sc = Process(target=shenchan, args=(q,)) 22 sc.start() 23 xf = Process(target=xiaofei, args=(q,)) 24 xf.start() 25 sc.join() # 设join,阻塞一下,先让生产者,消费者进程执行完 26 q.put(None) # 再想队列中方None
3、通过JoinableQueue([maxsize]) 实现
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
1 #生产者消费者模型 2 import time 3 from multiprocessing import Process,Queue,JoinableQueue 4 5 def producer(q): 6 for i in range(1,11): 7 time.sleep(0.5) 8 print('生产了包子%s号' % i) 9 q.put(i) 10 q.join() 11 print('在这里等你') 12 def consumer(q): 13 while 1: 14 time.sleep(1) 15 s = q.get() 16 print('消费者吃了%s包子' % s) 17 q.task_done() #给q对象发送一个任务结束的信号 18 19 if __name__ == '__main__': 20 #通过队列来模拟缓冲区,大小设置为20 21 q = JoinableQueue(20) 22 #生产者进程 23 pro_p = Process(target=producer,args=(q,)) 24 pro_p.start() 25 #消费者进程 26 con_p = Process(target=consumer,args=(q,)) 27 con_p.daemon = True # 28 con_p.start() 29 pro_p.join() 30 print('主进程结束')
#生产者消费者模型
import time
from multiprocessing import Process,Queue,JoinableQueue
def producer(q):
for i in range(1,11):
time.sleep(0.5)
print('生产了包子%s号' % i)
q.put(i)
q.join()
print('在这里等你')
def consumer(q):
while 1:
time.sleep(1)
s = q.get()
print('消费者吃了%s包子' % s)
q.task_done() #给q对象发送一个任务结束的信号
if __name__ == '__main__':
#通过队列来模拟缓冲区,大小设置为20
q = JoinableQueue(20)
#生产者进程
pro_p = Process(target=producer,args=(q,))
pro_p.start()
#消费者进程
con_p = Process(target=consumer,args=(q,))
con_p.daemon = True #
con_p.start()
pro_p.join()
print('主进程结束')