1.Process创建进程
from multiprocessing improt Process
import time
def test():
while True:
print('test')
time.sleep(1)
p = Process(target=test)
p.start() # 让这个进程开始执行test函数内的代码
while True:
print('main')
time.sleep(1)
结果为:
main
test
main
test
...
主进程会等待Process子进程结束
2.Process语法结构
(1) Process([group[, target[, name[, args [,kwargs]]]]])
target:表示这个进程实例所调用对象
args:表示调用对象的位置参数元组
kwargs:表示调用对象的关键字参数字典
name:当前进程实例的例名
group:大多时候用不到
(2)常用方法
join([timeout])函数:等到子进程结束后,代码才会往下走,或者等待多少秒
is_alive()函数:判断进程实例是否还在执行
start()函数:启动进程实例
run()函数:如果没有给定target参数,,对这个对象调用start()方法时,将执行对象中的run()方法
terminate()函数:不管任务是否完成,立即结束
(3)常用属性
name:当前进程实例的例名,默认为Process-N,N为从1开始递增的整数
pid:当前进程实例的pid值
3.Process子类创建子进程
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
while True:
print('--1--')
time.sleep(1)
if __name__=='__main__':
p = MyProcess()
p.start()
while True:
print('--2--')
time.sleep(1)
结果为:
--2--
--1--
--2--
--1--
..
4.进程池Pool
from multiprocessing import Pool
import os
import random
import time
def work(num):
for i in range(5):
print('---pid=%d--num=%d' %(os.getpid(), num))
time.sleep(1)
if __name__=='__main__':
#表示进程池中有3个进程
pool = Pool(3)
for i in range(10):
print('--%d--' %i)
# 向进程池中添加任务
# 如果添加的任务数量超过了进程池中进程的个数,那么不会导致添加不进去的情况
# 添加到进程中的任务,如果没有被执行,那么此时他们会等待进程池中的进程完成一
# 个任务之后,会自动的去用刚刚的那个进程完成当前的新任务
# pool.apply() 堵塞方式运行
pool.apply_async(work, (i,)) # 非堵塞方式运行
# 关闭进程池,相当于不能再添加新的任务了
pool.close()
# 主进程创建/添加任务后,主进程默认不会等待进程池中的任务执行完后才结束
# 而是当主进程的任务做完之后立马结束
# 如果这个地方没有join,会导致进程池中的任务不会执行
pool.join()
5.进程间的通信(queue 队列)
(1)Queue的使用
from multiprocessing import Queue
q = Queue(3) # 初始化一个Queue对象,最多可接受3条消息
q.put() # 将消息放入队列
q.get() # 将消息取出
q.put_nowait() # 无需等待,直接写入.如果队列满,则报错
q.get_nowait() # 无需等待,直接读取.如果队列控,则报错
# 推荐的写入方式
if not q.full():
q.put_nowait()
# 推荐的读取方式
if not q.emply():
for i in range(q.qsize()):
print(q.get_nowait)
(2)Queue配合Process使用
from multiprocess import Process, Queue
import os, time, random
# 写数据进程执行的代码
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s in Queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码
def read(q):
while True:
if not q.empty:
value = q.get(True)
print('Get %s in Queue...' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父进程创建Queue
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动pw进程
pw.start()
# 等待pw结束
pw.join()
# 启动pr进程
pr.start()
# pr进程里面是死循环,无需等待其结束,只能强行终止
pr.join()
print("所有数据都写入,并且读完")
(3)进程池中的Queue
from multiprocessing import Manager, pool
import os, time, random
def read(q):
print('reader启动(%s), 父进程为(%s)' % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print('reader从Queue获取到消息为%s' %q.get(True))
def write(q):
print('write启动(%s), 父进程为(%s)' % (os.getpid(), os.getppid()))
for i in 'laoli':
q.put(i)
if __name__=='__main__':
print('(%s) start' %os.getpid())
q = Manager.Queue() # 使用Manage中的Queue来初始化
po = Pool()
# 使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让write完全执行完成后,再用reader
po.apply(write, (q,))
po.apply(read, (q,))
po.close()
po.join()
print('(%s) End' %os.getpid())