from multiprocessing.dummy import Pool # 1、线程池导入
import time
class Engine(object):
def __init__(self):
# 2、创建线程池和程序停止的条件
self.pool = Pool()
self.is_running = False # False的时候停止循环
self.async_nums = 0
# 3、创建异步回调函数
def _callback(self, temp): # 这是异步线程池的callback参数指向的函数,temp参数为固定写法
if self.is_running:
self.pool.apply_async(self.func, callback=self._callback, error_callback=self._error_callback)
# 4、创建要循环调用的函数
def func(self):
print('--------{}-------'.format(self.async_nums))
self.async_nums += 1
# 补充:创建异常回调函数
def _error_callback(self, exception):
try:
raise exception
except Exception as e:
print(e)
# 5、主程序
def run(self):
self.is_running = True
for i in range(3): # 设置最大并发数为3
self.pool.apply_async(self.func, callback=self._callback, error_callback=self._error_callback)
# 用循环实现不断调用
while True:
time.sleep(0.001) # 避免cpu空转,避免性能消耗
if self.async_nums > 100:
self.is_running = False
print("程序结束")
break
if __name__ == '__main__':
engine = Engine()
engine.run()
运行结果:
·····
--------117-------
--------118-------
--------120-------
--------121-------
程序结束
超过一百是正常的,因为在等待主程序结束的时候线程也在不断的调用,其次线程池中的是无法停止的
添加协程池的异步
from multiprocessing.dummy import Pool as mlPool # 1、线程池导入
import time
# 协程池异步1、导入模块
from gevent.pool import Pool as BasePool
import gevent.monkey
gevent.monkey.patch_all()
# 协程池异步2、重写协程池
class gtPool(BasePool):
"""
重写协程池类
由于gevent的Pool的没有close方法,也没有异常回调参数(error_callback)
需要对gevent的Pool进行进一步的封装,实现与线程池一样接口,实现线程和协程的无缝转换
"""
# 协程池异步3、重写apply_async方法和添加close()方法
def apply_async(self, func, args=None, kwds=None, callback=None, error_callback=None):
return super().apply_async(func, args=args, kwds=kwds, callback=callback) # super()调用父类的apply_async方法
def close(self):
'''什么都不需要执行'''
pass
# 协程池异步4:、设定异步方式,字符串为thread(线程)或者coroutine(协程)
ASYNC_TYPE = ''
if ASYNC_TYPE == 'thread':
Pool = mlPool # 导入线程池对象
elif ASYNC_TYPE == 'coroutine':
Pool = gtPool # 导入协程池对象
else:
raise Exception("不支持的异步类型")
class Engine(object):
def __init__(self):
# 2、创建线程池和程序停止的条件
self.pool = Pool()
self.is_running = False # False的时候停止循环
self.async_nums = 0
# 3、创建异步回调函数
def _callback(self, temp): # 这是异步线程池的callback参数指向的函数,temp参数为固定写法
if self.is_running:
self.pool.apply_async(self.func, callback=self._callback, error_callback=self._error_callback)
# 4、创建要调用的循环函数
def func(self):
print('--------{}-------'.format(self.async_nums))
self.async_nums += 1
# 补充:创建异常回调函数
def _error_callback(self, exception):
try:
raise exception
except Exception as e:
print(e)
# 5、主程序
def run(self):
self.is_running = True
for i in range(3): # 设置最大并发数为3
self.pool.apply_async(self.func, callback=self._callback, error_callback=self._error_callback)
# 用循环实现不断调用
while True:
time.sleep(0.001) # 避免cpu空转,避免性能消耗
if self.async_nums > 100:
self.is_running = False
print("程序结束")
break
if __name__ == '__main__':
engine = Engine()
engine.run()