文章目录
一、关于线程池\进程池介绍
1.1 池的概念
-
池是一组资源的集合,这组资源在程序启动时就完全被创建并初始化,这也称为为
静态资源分配
。 -
程序从系统(内存)调用和分配资源,包括之后释放资源都需要耗费大量时间。如果把相关资源放在“池“中,程序从池中获取和释放,无需动态分配,无疑速度要快很多。
-
池相当于服务器管理系统资源的应用设施,它避免了服务器对内核的频繁访问。
池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,“以空间换时间”,来提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。
1.2 池的划分
池可以分为多种,常见的有
内存池
、进程池
、线程池
和连接池
。
1.3 线程池和进程池的区别
线程池和进程池相似,用法基本一致。主要是应用场景的不同,在判断对程序是否进行线程池或进程池操作时,得先看程序业务。
业务 | 用途 | 建议使用 |
---|---|---|
IO密集型 | 读取文件,读取网络套接字频繁等。 | 线程池 |
计算密集型 | 大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。 | 进程池,利用硬件多核优势 |
1.5 进程池的创建(流程)
- 创建进程池,在池内放入合适数量的进程
- 将事件加入进程池的等待队列
- 使用进程池内的进程不断的执行等待事件,直到所有事件执行完毕
- 所有事件处理完毕后,关闭进程池,回收进程池
二、创建线程池\进程池的两种方法
从Python3.2python标准库为我们提供了concurrent和multiprocessing模块编写相应的异步多线程/多进程代码。
2.1 concurrent和multiprocessing区别
两个模块本质区别并不大,有的也只是调用方式略有差异。先有的 multiprocessing,后有concurrent.futures,后者的出现就是为了降低编写代码的难度,后者的学习成本较低。
本博文主要以介绍 concurrent.futures模块为主。
三、concurrent.futures模块
3.1 模块的介绍
从 Python3.2开始,Python 标准库提供了 concurrent.futures 模块,为开发人员提供了启动异步任务的高级接口。
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
3.2 Executor.submit 创建进程/线程池
进程/线程池,只有固定个数的线程/进程,通过 max_workers 指定。
-
任务通过 Executor.submit 提交到 executor 的任务队列,返回一个 future 对象。
-
Future 是常见的一种并发设计模式。一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。
-
任务被调度到各个 workers 中执行。但是要注意:
- 一个任务一旦被执行,在执行完毕前,会一直占用该 worker!
- 如果 workers 不够用,其他的任务会一直等待! 因此 Executor不适合实时任务。
简易创建进程池示例:
'''
Executor.submit 创建进程实例
'''
from concurrent.futures import ProcessPoolExecutor
import time, os
# 打印信息
def print_info(n):
print("%s: 开启" % os.getpid())
time.sleep(1)
return n**2
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 开启四个进程
for i in range(10): # 执行10个任务
pool.submit(print_info, i)
四、concurrent.futures 常用模块
concurrent.futures 包含三个部分的 API:
4.1 Executor模块
也就是两个执行器的 API
-
构造器:主要的参数是 max_workers,用于指定线程池大小(或者说 workers 个数)
-
submit(fn, *args, **kwargs) # 返回一个 future,用于获取结果
- 将任务函数 fn 提交到执行器,args 和 kwargs 就是 fn 需要的参数。
-
map(func, *iterables, timeout=None, chunksize=1) # 返回一个 futures 的迭代器
- 当任务是同一个,只有参数不同时,可以用这个方法代替 submit。iterables 的每个元素对应 func 的一组参数。
-
shutdown(wait=True)
:关闭执行器,一般都使用 with 管理器自动关闭。
4.2 Future模块
任务被提交给执行器后,会返回一个 future
函数 | 作用 |
---|---|
future.result(timout=None) | 最常用的方法返回任务的结果。如果任务尚未结束,这个方法会一直等待! |
-
timeout 指定超时时间,为 None 时没有超时限制。
-
exception(timeout=None)
:给出任务抛出的异常。和 result() 一样,也会等待任务结束。 -
cancel()
:取消此任务 -
add_done_callback(fn)
:future 完成后,会执行fn(future)
。 -
running()
:是否正在运行 -
done()
:future 是否已经结束了,boolean -
…详见官方文档
4.3 模块其他实用函数
函数 | 功能 |
---|---|
concurrent.futures.as_completed(fs, timeout=None) | 等待 fs (futures iterable)中的 future 完成 |
- 一旦 fs 中的某 future 完成了,这个函数就立即返回该 future。
- 这个方法,使每次返回的 future,总是最先完成的 future。而不是先等待任务 1,再等待任务 2…
- 常通过
for future in as_completed(fs):
使用此函数。
函数 | 功能 |
---|---|
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED) | 一直等待,直到 return_when 所指定的事发生,或者 timeout |
- return_when 有三个选项:ALL_COMPLETED(fs 中的 futures 全部完成),FIRST__COMPLETED(fs 中任意一个 future 完成)还有 FIRST_EXCEPTION(某任务抛出异常)
五、程序实例
5.1 进程池实例
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
p = ProcessPoolExecutor() #不填则默认为cpu的个数
# print(os.cpu_count()) # 获取本地cpu个数
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i) #submit()方法返回的是一个future实例,要得到结果需要用obj.result()
l.append(obj)
p.shutdown() #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
print('='*30)
# print([obj for obj in l])
print([obj.result() for obj in l])
print(time.time()-start)
#上面方法也可写成下面的方法
# start = time.time()
# with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
# print(time.time() - start)
补充:
1、执行完了shutdown()方法之后,再运行 submit 就会报错
2、concurrent.futures 基于 multiprocessing. Pool 实现,因此实际上它比直接使用 线程/进程 的 Pool 要慢一点。但是它提供了更方便简洁的 API。
5.2 线程池实例
from concurrent.futures import ThreadPoolExecutor
import threading
import os, time
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(),os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
p = ThreadPoolExecutor() # 不填则默认为cpu的个数*5
l = []
start = time.time()
for i in range(10): # 开启10个任务
obj = p.submit(task,i)
l.append(obj)
p.shutdown()
print('='*30)
print([obj.result() for obj in l])
print(time.time()-start)
5.3 同步和异步的实例
- 同步调用:提交完任务,就在原地等,等任务执行完,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
- 异步调用:提交完任务,不在原地等,程序是并行执行, 任务完成后会返回一个实例,必须通过特殊方法才能得到
线程池和进程池中默认的都是异步执行的。这样能够最大程度提高程序的执行速度。
异步回调实例
# 异步调用
def counter(x):
time.sleep(1)
return x**x
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor(4)
l = []
for i in range(1, 10):
results = p.submit(counter, i)
l.append(results)
print([i.result() for i in l])
p.shutdown()
print('花费时间:', time.time() - start)
print('运算结束!')
'''
结果:
[1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]
花费时间: 3.282705783843994
运算结束!
'''
同步回调实例
# 同步调用
def counter(x):
time.sleep(1)
return x**x
if __name__ == '__main__':
start = time.time()
p = ProcessPoolExecutor(4)
l = []
for i in range(1, 10):
res = p.submit(counter, i).result()
l.append(res)
print([i for i in l])
p.shutdown(wait=True)
print('花费时间:', time.time() - start)
print('运算结束!')
'''
结果:
[1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]
花费时间: 9.329394340515137
运算结束!
'''
5.4 Executor.map用法示例
from concurrent import futures
import time
def test(x):
time.sleep(2)
return time.ctime(), x
if __name__ == '__main__':
with futures.ThreadPoolExecutor() as T:
for i in T.map(test,[1,2,3]):
print(i)
# 等同于下面注释代码
# with futures.ThreadPoolExecutor() as T:
# l = []
# for i in [1, 2, 3]:
# obj = T.submit(test, i)
# l.append(obj)
#
# print([i.result() for i in l])
'''
结果:
('Tue Apr 7 00:12:17 2020', 1)
('Tue Apr 7 00:12:17 2020', 2)
('Tue Apr 7 00:12:17 2020', 3)
'''