版权声明:qq:1263351411 https://blog.csdn.net/u013008795/article/details/91481598
Python中的concurrent并发包(构建线程池和进程池)
文章目录
concurrent.futures
3.2版本引入的模块。
异步并行任务编辑模块,提供一个高级的异步可执行的便利接口。
提供了2个池执行器
- concurrent.functures库统一了线程池和进程池,简化了编程。体现了Python简单的思想哲学。
- ThreadPoolExecutor异步调用的线程池的Executor
- ProcessPoolExecutor异步调用的进程池的Executor
ThreadPoolExecutor线程池与ProcessPoolExecutor进程池对象
首先需要定义一个池的执行器对象,Executor类子类对象。
方法 | 含义 |
---|---|
submit(fn,*args,**kwargs)->Future | 提交执行的函数及其参数,返回Future类的实例 |
shutdown(wait=True) | 清理池,默认等待计算完成后再清理所有池中的对象。 |
map(func, *iterables, timeout=None, chunksize=1) | 类似于map函数,提供映射计算。 func映射函数。 iterables可迭代对象,用来一个个传递给映射函数。 chunksize=1,每次计算,块的大小,默认值为1,当数据特别大时,可以修改chunksize的值来提高运算速度。 |
- 注意:
- ThreadPoolExecutor与ProcessPoolExecutor都是懒惰模式,在使用时才初始化创建对应的线程和进程。
构造方法
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) #进程池构造方法
- max_workers #最大进程个数
- mp_context #在版本3.7中进行了更改:添加了mp_context参数,允许用户控制池创建的worker进程的start_method。
- initializer #每个工作进程创建开始时调用的函数,即进程创建后的第一个初始化函数。
- initargs #初始化函数initializer中的位置参数。
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=’’, initializer=None, initargs=())->Future #线程池初构造方法
- max_workers #最大线程个数
- thread_name_prefix #线程池创建工作线程时线程名的前缀。
- initializer #每个工作线程创建开始时调用的函数,即线程创建后的第一个初始化函数。
- initargs #初始化函数initializer中的位置参数。
concurrent.futures.Future类,未来结果类
方法 | 含义 |
---|---|
done() | 如果调用被成功的取消或者执行完成,返回True |
cancelled() | 如果调用被成功的取消,返回True |
running() | 如果正在运行且不能被取消,返回True |
cancel() | 尝试取消调用。如果已经执行且不能取消返回False,否则返回True |
result(timeout=None) | 获取返回的结果,timeout默认为None,表示永久阻塞直到返回结果。 如果设置了timeout并且超时还未返回结果,则抛出concurrent.futures.TimeoutError异常 |
exception(timeout=None) | 获取返回的异常,timeout默认为None表示一直阻塞直到返回。 timeout设置到期,抛出concurrent.futures.Timeouterror异常 |
add_done_callback(fn) | 添加回调函数,回调函数必须是一参的回调函数,会将返回结果Future对象传入。 可以添加多个回调函数,回调函数执行的顺序是添加的顺序。 |
concurrent.futures.wait方法
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)->(done,not_done) #等待任务执行结果。会阻塞,直到等待fs中所有任务都执行完成返回结果。
-
fs #是个列表,由多个Future组成。
-
timeout=None #超时时长。默认为None。永久阻塞,直到所有fs都返回结果才返回。
-
return_when=ALL_COMPLETED #模式,决定函数何时返回,默认模式为:ALL_COMPLETED。可选模式如下:
- FIRST_COMPLETED #只要有一个Future计算完成或被取消就立即返回结果,结束阻塞
- FIRST_EXCEPTION #只有有一个Future出现异常就立即返回结果,结束阻塞。如果一直没有异常,就相当于ALL_COMPLETED模式。
- ALL_COMPLETED #当所有Future计算完成,或被取消才返回结果,结束阻塞。
-
done #返回值二元组中的第一个参数,一个集合,记录所有已经执行完成返回结果的Future
-
not_done #返回值二元组中的第二个参数,一个集合,记录所有未执行完的Future
-
综合示例
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent import futures
import threading,multiprocessing
import logging
import time,sys
FORMAT = "%(asctime)-15s [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s"
logging.basicConfig(stream=sys.stdout,level=logging.INFO,format=FORMAT)
log = logging.getLogger(__name__)
def init():
log.info("我被初始化了--------")
def worker(aaa):
log.info("开始工作了~~~~~~~~~~~~~")
time.sleep(2)
log.info("工作完成")
return aaa
def callback(fs):
log.info("回调函数执行{},{}".format(fs,fs.result()))
def callback2(fs):
log.info("2回调函数执行{},{}".format(fs,fs.result()))
if __name__ == "__main__":
# executerThr = ThreadPoolExecutor(max_workers=3,initializer=init)
executerPro = ProcessPoolExecutor(max_workers=3,initializer=init)
time.sleep(0.1)
# 注意ThreadPoolExecutor线程池是懒惰模式,使用时才会创建
print(threading.enumerate(),multiprocessing.active_children())
fs = []
for i in range(5):
f = executerPro.submit(fn=worker,aaa=i)
f.add_done_callback(callback)
f.add_done_callback(callback2)
fs.append(f)
print(threading.enumerate())
print(multiprocessing.active_children())
# executerPro.shutdown()
# for i in fs:
# log.info(i.result())
done,not_done = futures.wait(fs,return_when=futures.ALL_COMPLETED)
for i in done:
print(i.result())
print("开始打印not_down....")
for i in not_done:
print(i.result())
上下文管理
concurrent.futures.ProcessPoolExecutor继承自concurrent.futures._base.Executor,而父类有__enter__
、 __exit__
方法,支持上下文管理。可以使用with语句。
__exit__
方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成
- 综合示例
import threading
from concurrent import futures
import logging,sys
import time
FORMAT = "%(asctime)-15s [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s"
logging.basicConfig(stream=sys.stdout,level=logging.INFO,format=FORMAT)
log = logging.getLogger(__name__)
def worker(n):
log.info("开始工作:{}".format(n))
time.sleep(2)
logging.info("结束工作 {}".format(n))
if __name__ == "__main__":
with futures.ThreadPoolExecutor(max_workers=3) as executer:
fs = []
for i in range(5):
fs.append(executer.submit(worker,i))
while True:
time.sleep(2)
log.info(threading.enumerate())
flag = True
for f in fs:#判断是否还有未完成的任务
log.info(f.done())
flag = flag and f.done()
# if not flag:
# break
print("- "*30)
if flag:
break
# executer.shutdown() #上下文已经清理了资源
log.info("========end=========")
log.info(threading.enumerate())