一,进程间通信(IPC机制)
1.1进程之间的内存空间是相互独立的,想要通讯必须通过管道(数据放在里面没有安全性)或者是队列(管道+锁)
from multiprocessing import Queue q = Queue(队列中放数据的数量) #下面是把括号里面的内容放到队列里面 q.put(['first',]) q.put({'x':2}) q.put('三') q.put(4) #下面是把队列里面的内容取出来 print(q.get()) print(q.get()) print(q.get()) print(q.get())
1.2了解知识点
q=Queue(3) # q.put(内容,锁的状态,等待的时间) q.put(['first',],block=True,timeout=3) # 如果确定队列里面有存放数据的空间,就不需要block=True,timeout=3(这两个是搭配使用的) q.put({'x':2},block=True,timeout=3) q.put(3,block=True,timeout=3) # 如果block=False,队列里面没有存放数据的空间会直接报错 # q.put(4,block=True,timeout=3) q.put_nowait(1) #q.put(1,block=False)表示直接把数据放入队列,放不进去就报错,不需要等待 q.put_nowait(2) q.put_nowait(3) # q.put_nowait(4) # 如果确定队列里面有要取的数据,就不需要block=True,timeout=3(这两个是搭配使用的) print(q.get(block=True,timeout=3)) print(q.get()) print(q.get()) # 如果要取的数据超过了队列的范围,才能充分体现block=True,timeout=3的价值 # print(q.get(block=True,timeout=3)) print(q.get_nowait()) #q.get(block=false)表示直接到队列里面取数据,数据不存在就报错,不需要等待 print(q.get_nowait()) #q.get(block=false) print(q.get_nowait()) #q.get(block=false) print(q.get_nowait()) #q.get(block=false)
二,生产者与消费者模型
2.1什么是生产者消费者模型
生产者:指的是生产数据的任务
消费者:指的是处理数据的任务
该模型的工作方式:生产者产生数据后交给消费者进行数据处理
实现方式:生产者----->队列<------消费者
2.2为何要用
当程序中出现明确的两个任务,一个负责产生数据,一个负责处理数据.可以引入生产者与消费者模型来解耦合,充分发挥生产者产生数据的能力和消费者处理数据的能力,从而提高效率
2.3怎么用
这里产生了一个问题,生产者一直生产数据而消费者一直向队列里面取数据,到了生产者不再产生数据的时候只有生产者知道,而消费者还在等待接收数据,消费者并没有接受到生产者不再产生数据的信息,而主进程在等待非守护进程全部结束它本身才会结束,这样会因为消费者阻塞在接收消息这里导致整个程序阻塞.下面有两种方法进行改良此bug:
(一)用join方法在主进程等待生产者全部结束以后,给队列里面添加None数据信息(存在几个消费者需要添加几个None),当消费者收到None时同时结束消费者的子进程
import time,random from multiprocessing import Process,Queue def producer(name,food,q): for i in range(3): res = '%s%s' %(food,i) time.sleep(random.randint(1,3)) q.put(res) print('厨师%s生产了%s' %(name,res)) def consumer(name,q): while True: res = q.get() if not res:break time.sleep(random.randint(1,3)) print('吃货%s吃了%s' %(name,res)) if __name__ == '__main__': q = Queue() # 生产者们 p = Process(target=producer, args=('jack', '包子', q)) p1 = Process(target=producer, args=('lili', '饺子', q)) p2 = Process(target=producer, args=('egon', '馒头', q)) # 消费者们 c = Process(target=consumer,args=('json', q)) c1 = Process(target=consumer, args=('tom', q)) p.start() p1.start() p2.start() c.start() c1.start() p.join() p1.join() p2.join() q.put(None) q.put(None) c1.join() c.join() print('主进程')
(二),调用multiprocessing里面的JoinableQueue方法,在定义队列的时候q = JoinableQueue()
消费者每次取完值得时候,都需要向队列发送一个取出这个值得信号,在主进程等待生产者全部结束之后,q.join()
再判断队列中是否存在值,直到不存在值之后,结束掉所有进程.
import time,random from multiprocessing import Process,JoinableQueue def producer(name,food,q): # 生产者 for i in range(3): res = '%s%s' %(food,i) time.sleep(random.randint(1,3)) q.put(res) print('厨师%s生产了%s' %(name,res)) def consumer(name,q): while True: res = q.get() time.sleep(random.randint(1,3)) print('吃货%s吃了%s' %(name,res)) q.task_done() #向队列发送一个取出值得信号 if __name__ == '__main__': q = JoinableQueue() #定义一个新的队列 # 生产者 p1 = Process(target=producer, args=('jack', '包子', q)) p2 = Process(target=producer, args=('lily', '饺子', q)) p3 = Process(target=producer, args=('Tom', '馒头', q)) # 消费者 c1 = Process(target=consumer, args=('旺财', q)) c2 = Process(target=consumer, args=('小花', q)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c1.start() p1.join() p2.join() p3.join() q.join() # 主进程等Q结束,即q内数据被取干净了 print('主')