python的多线程开发由于GIL的缘故,一般都会使用多进程来进行开发。
用的最多的就是multiprocessing这个库,而创建子进程的方式主要有,Process和Pool。
其实网上有好多使用Pool(进程池)来创建进程,但本文主要使用Process来创建。
好了,开始吧。
场景就是一个主进程,两个子进程,子进程间能通信,父子之间能通信。
具体需求是:
1.主进程和子进程共享变量,主进程将变量传入子进程,子进程修改后,待子进程退出后,返回给主进程,主进程将结果用于他用。
这里使用了multiprocessing.Manage去实现功能。
Manage主要是支持list,dict等一些结构。这个需要在主进程中创建,然后传给子进程。
#主进程
with multiprocessing.Manager() as MG:
result_list = MG.dict() #共享一个字典型变量
# 1.创建一个子进程,传入result_list
download_handle = multiprocessing.Process(target=downdload_process, args=(result_list ))
# 3. 子进程修改了主进程,主进程就可以使用了
name = result_list['name']
age = result_list['age']
# 2.子进程 修改 主进程传入的变量
def downdload_process(shared_data):
shared_data['name'] = '全麦'
shared_data['age'] = '8个月'
2. 子进程间互相通信,一个子进程失败,通知另一个子进程失败,然后主进程也退出,我使用queue来处理这个需求。
# 主进程 #
# 在主进程 声明两个队列,q:用于download子进程往q中put数据,deal子进程从q中取数据,q_error:用于deal子进程异常后,告诉download进程,我异常退出了,你也退出吧。
q = multiprocessing.JoinableQueue()
q_error = multiprocessing.Queue()
download_handle = multiprocessing.Process(target=downdload_process, args=(q, q_error))
deal_handle = multiprocessing.Process(target=deal_process, args=(q, q_error))
# download子进程 放数据
def downdload_process(queue, queue_error):
try:
for i in range(100):
# 判断 deal进程是否发来异常信息,有异常,直接返回,退出进程。
if not queue_error.empty():
return
queue.put_nowait(i)
except Exception as e:
# 进程异常了,告诉deal进程,退出吧
queue_error.put_nowwait(-1) #deal进程从queue_error队列中取到-1后就退出进程。
queue.join() # 阻塞,等待deal进程全部处理完后,通知,使用task_done
queue.put_nowait(None) # 往队列里发个None,告诉deal进程,我没数据放了。
# deal子进程 取数据
def deal_process(queue, queue_error):
try:
while True:
# 由于download 放数据可能需要些时间,例如从网上下载,所以循环判断queue是否有数据
if queue.empty():
continue
# 有异常 退出
if not queue_error.empty():
return
data = queue.get_nowait()
# 如果 取到None 说明没数据了,可以退出了
if q_data is None:
break
# 下面就是进程的业务处理了
。。。。
。。。。
。。。。
queue.task_done() # 每次执行完一个循环,调用一次task_done,这个适合deal进程的join()配合使用的,具体可以上网查。
except Except as e:
# 异常了,去告诉download进程,你也退出吧
queuue_error.put_nowait(-2) #这里设置-2,跟download区别一下
以后就是几个场景需求。
其实后来我才发现 Pool也比较好用,因为支持异常回调。
之后会尝试使用Pool重写一下功能。
想法可能有些不完善,以后继续进步。