进程的定义作用已经在其他文章中讲过,这里谈一谈进程之间的通信问题。
一: 队列(Queue)
可以使用multiprocessing模块的Queue实现多进程之间的数据传递
下面是一个具体的实现栗子,演示一下Queue的工作原理:
from multiprocessing import Queue
# 创建容量为3的队列
q = Queue(3)
q.put('message1')
q.put('message2')
print(q.full())
q.put('message3')
print(q.full())
try:
q.put('message4',True,2)
except:
print('message queue was full, the num of message: %s' % q.qsize())
try:
q.put_nowait('message4')
except:
print('message was full, the num of message: %s'%q.qsize())
if not q.full():
q.put_nowait('message4')
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
运行结果:
False
True
message was full, the num of message:3
message was full, the num of message:3
message1
message2
message3
说明
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
-
Queue.qsize():返回当前队列包含的消息数量;
-
Queue.empty():如果队列为空,返回True,反之False ;
-
Queue.full():如果队列满了,返回True,反之False;
-
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
-
Queue.get_nowait():相当Queue.get(False);
-
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
- Queue.put_nowait(item):相当Queue.put(item, False);
二: 进程池(Pool)
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:
# process pool
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s开始执行,进程号为%d" %(msg, os.getpid()))
time.sleep(random.random()*2)
t_stop = time.time()
print(msg, '执行完毕, 耗时%0.2f' %(t_stop-t_start))
# 创建进程池,并指定容量3
po = Pool(3)
for i in range(0,10):
po.apply_async(worker,(i,)) # 有空闲时调用worker任务
print('----start-----')
po.close()
po.join()
print('----end------')
结果:
----start-----
0开始执行,进程号为16454
1开始执行,进程号为16456
2开始执行,进程号为16455
1 执行完毕, 耗时1.44
3开始执行,进程号为16456
0 执行完毕, 耗时1.49
4开始执行,进程号为16454
2 执行完毕, 耗时1.63
5开始执行,进程号为16455
3 执行完毕, 耗时0.31
6开始执行,进程号为16456
4 执行完毕, 耗时1.23
7开始执行,进程号为16454
7 执行完毕, 耗时0.42
8开始执行,进程号为16454
6 执行完毕, 耗时1.39
9开始执行,进程号为16456
5 执行完毕, 耗时1.60
9 执行完毕, 耗时0.13
8 执行完毕, 耗时1.97
----end------
Process finished with exit code 0
说明:
multiprocessing.Pool常用函数解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
- close():关闭Pool,使其不再接受新的任务;
- terminate():不管任务是否完成,立即终止;
- join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
进程池中进程的通信----进程池中的Queue
# Queue in Pool
# the use of multiprocessing.Manager().Queue()
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print('reader启动(%s), 父进程为(%s)' %(os.getpid(), os.getppid()))
for i in range(q.qsize()):
print('reader从Queue队列获取到的消息: %s' % q.get(True))
def writer(q):
print('writer启动(%s), 父进程为(%s)' % (os.getpid(), os.getppid()))
for i in 'scrat':
q.put(i)
if __name__ == '__main__':
print('(%s)start' % os.getpid())
q = Manager().Queue() # 使用Manager中的queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1)
po.apply_async(reader, (q,))
po.close()
po.join()
print('(%s) end' % os.getpid())
结果:
(16926)start
writer启动(16932), 父进程为(16926)
reader启动(16933), 父进程为(16926)
reader从Queue队列获取到的消息: s
reader从Queue队列获取到的消息: c
reader从Queue队列获取到的消息: r
reader从Queue队列获取到的消息: a
reader从Queue队列获取到的消息: t
(16926) end
Process finished with exit code 0