every blog every motto: Light tomorrow with today.
0. 前言
因为最近在写有关多进程返回值的问题,涉及到这方面,索性进行简单的小结
说明: 其中有关函数计时用到了装饰器,可参考装饰器及文末的参考文章
说明:
-
下表为对同一段程序进行10次运行以后的平均时间(单位:秒)。
-
多进程方法均在multiprocessing中,
方法 | Pool | pool.Pool | pool.ThreadPool |
---|---|---|---|
apply | 3.722 | 3.774 | 1.748 |
apply_async | 1.868 | 1.926 | 0.863 |
注:
- 未使用多进程时间为0.329(非平均,仅一次),并不能说明使用多进程使程序运行更慢,这应该与我的测试代码有关,为使文章完整,附上未使用多进程的测试时间。
- 读者仅需比较不同进程方法下的apply和apply_async就可。
补充:
补充时间:2020.8.10.16:47
在实际运行用中发现有意思的现象,准确说和本文得出的结论恰恰相反,遂记之。
方法 | Pool | pool.Pool | pool.ThreadPool |
---|---|---|---|
apply | 98~ | 99~ | 98~ |
apply_async | 33~ | 34~ | 98~ |
说明:
- 实际中,反而是Pool和pool.Pool的apply_async方法更快
- 上述时间非完全准确时间,细微差距读者不必深究,“~”表明大约之意。
同样的是,又同段程序测试了如下多进程方法:
时间约为33秒。
process_li = []
# 1. 数据处理,及保存
for file in files:
# 多进程
t = Process(target=preprocessing_data, args=(file,))
t.start()
process_li.append(t)
for i in process_li:
i.join()
1. 正文
1.0 未使用多进程
import time
loop_number = 30000
def count_time(func):
"""装饰器:计算程序运行时间"""
def wrapper(*args, **kwargs):
t1 = time.time()
func(*args, **kwargs)
t2 = time.time()
# 计算时间
time_consumed = t2 - t1
print('{}函数一共花费了{}秒'.format(func.__name__, time_consumed))
return wrapper
def fun(k):
"""被测试函数"""
print('-----fun函数内部,参数为{}----'.format(k))
m = k + 10
return m
@count_time
def call_fun():
"""没有使用多线程的情况"""
number = 0
for i in range(loop_number):
print('number:{}'.format(number))
number = fun(number)
def main():
"""主程序"""
# 1. 没有使用多线程
call_fun()
if __name__ == '__main__':
main()
1.1 multiprocessing.Pool下的两种方法
导入模块
from multiprocessing import Pool
1.1.1 apply
描述: apply方法是阻塞的,即等待子进程执行完毕后,再执行下一个子进程。
def fun(k):
"""被测试函数"""
print('-----fun函数内部,参数为{}----'.format(k))
m = k + 10
return m
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool = Pool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply(fun, args=(k,))
def main():
"""主程序"""
# 3. 使用多进程
my_process()
if __name__ == '__main__':
main()
结果:
分别记录10次的时间: 3.717 ,3.689,3.716,3.725,3.653,3.696,3.841,3.698,3.672,3.808
平均时间为:3.722
1.1.2 apply_async
描述: apply_async是异步非阻塞的,即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。
import multiprocessing
def fun(k):
"""被测试函数"""
print('-----fun函数内部,参数为{}----'.format(k))
m = k + 10
return m
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool = Pool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply_async(fun, args=(k,))
# ----------------------------
# apply_async为非阻塞式,要加close和join
# 行号:007
pool.close()
pool.join()
# ------------------------------
print('---888---') # 查看主进程运行情况
def main():
"""主程序"""
# 3. 使用多进程
my_process()
if __name__ == '__main__':
main()
结果:
- 代码中 行号:007 注释时:
分别记录10次的时间: 0.518,0.515,0.498,0.549,0.490,0.510,0.493,0.511,0.493,0.495
平均时间: 0.507 - 代码中 行号:007 未注释时:
分别记录10次的时间: 1.857,1.798,1.791,1.887,2.004,1.793,1.867,1.982,1.910,1.790
平均时间: 1.868
说明:
- 第1种情况为错误情况,因为未加colse和join,子进程还没跑完就运行后面的,测试出来的时间也是不准确的(程序还没运行完,提前计算了时间),上面的”----888----会提前打印出来,可以佐证这一说法。
- 上面第2中情况应该为正确情况,要等子进程跑完,主进程才能运行后面的。具体参考文献3
小结:
- apply是阻塞式,即:一个子进程结束以后才能进行下一个,等到所有进程结束才切换到主进程,运行剩余部分。可以理解为串行。
- 在主进程和多个子进程之间切换,为了防止子进程还没运行完,主进程已经运行完的情况,需要将代码中“007”的注释部分取消注释,即上面的第2种情况。
1.2 multiprocessing.pool下的两种方法
1.2.1 Pool
导入模块
from multiprocessing.pool import Pool
1.2.1.1 apply
其余代码一样,为了查看主进程的运行效果,加了一行打印“----888----",位置位于:
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool =Pool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply(fun, args=(k,))
# pool.close()
# pool.join()
print('---888---')
说明:因为apply是阻塞式,不需要加以下代码
pool.close()
pool.join()
结果:
分别记录10次的时间: 4.024,3.772,3.617,3.673,3.906,3.890,3.712,3.719,3.668,3.838
平均时间: 3.774
1.2.1.2 apply_async
说明: apply_async为非阻塞式,需要加close和join
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool =Pool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply_async(fun, args=(k,))
pool.close()
pool.join()
print('---888---')
结果:
分别记录10次的时间: 1.866,2.082,1.801,1.858,2.115,1.846,1.857,2.088,1.880,1.868
平均时间: 1.926
1.2.2 ThreadPool
仅导入模块和创建进程部分不同,为了方便读者进行测试,此处贴完整代码
from multiprocessing.pool import ThreadPool
1.2.2.1 apply
def fun(k):
"""被测试函数"""
print('-----fun函数内部,参数为{}----'.format(k))
m = k + 10
return m
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool =ThreadPool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply(fun, args=(k,))
print('---888---')
def main():
"""主程序"""
# 3. 使用多进程
my_process()
if __name__ == '__main__':
main()
结果:
分别记录10次的时间: 1.715,1.716,1.715,1.714,1.839,1.815,1.714,1.715,1.826,1.713
平均时间: 1.748
1.2.2.2 apply_async
def fun(k):
"""被测试函数"""
print('-----fun函数内部,参数为{}----'.format(k))
m = k + 10
return m
@count_time
def my_process():
"""多进程"""
# 方法一:apply/apply_async
pool =ThreadPool(4) # 创建4个进程
k = 0
for i in range(loop_number):
pool.apply(fun, args=(k,))
pool.close()
pool.join()
print('---888---')
def main():
"""主程序"""
# 3. 使用多进程
my_process()
if __name__ == '__main__':
main()
结果:
分别记录10次的时间: 0.815,0.818,0.927,0.838,0.923,0.918,0.923,0.824,0.817,0.824
平均时间: 0.863
1.3 分析
多进程方法均在multiprocessing中,
方法 | Pool | pool.Pool | pool.ThreadPool |
---|---|---|---|
apply | 3.722 | 3.774 | 1.748 |
apply_async | 1.868 | 1.926 | 0.863 |
由以上表格发现,运行时间相差2倍
- 阻塞式apply: 只有等子进程结束后,才能切换到另外一个子进程,直到所有子进程运行完毕,然后切换到主进程。
- 异步非阻塞式apply_async: 在多个子进程之间来回切换,为防止子进程还没运行完,主进程就运行了后面的代码,需要加close和join
- multiprocessing下的Pool和pool.Pool方法二者并没有区别,细微差别可能和计算机本身原因有关。
- multiprocessing下的pool.ThreadPool时间最短,具体原因暂时未知。
建议:使用multiprocessing下的pool.ThreadPool的apply_async方法
知识无价,如果帮助到你,不妨请喝一杯奶茶~~
参考文献
[1] https://blog.csdn.net/weixin_39190382/article/details/107107980
[2] https://blog.csdn.net/htuhxf/article/details/101221768
[3] https://www.cnblogs.com/wr13640959765/p/9428245.html
[4] https://blog.csdn.net/weixin_43283397/article/details/104294890