参考原文:https://www.cnblogs.com/Eva-J/p/5106564.html
一个调用三方接口的获取数据,总共三千五百多个请求。原本我是用50个进程获取信息,都需要十多分钟。如果放到低配服务器就会直接崩溃。在得知这类请求是io密集型,鸡肋的线程用于这种情况下也还是有一些帮助的。因为没看到官方的线程池,所以就尝试自己写。当看到前面那篇文字之后,本着不重复造轮子的原则,就直接在源码上做了一些小改动,修复了一个小bug。
from multiprocessing import Queue
import contextlib
import threading
from collections import Iterable
WorkerStop = object()
# 线程池
class ThreadPool:
workers = 0
# 线程对象
threadFactory = threading.Thread
currentThread = staticmethod(threading.currentThread)
def __init__(self, maxthreads=20, name=None):
self.q = Queue(0)
self.max = maxthreads
self.name = name
self.waiters = []
self.working = []
# 启动方法
def start(self):
needSize = self.q.qsize()
# 启动 20 条线程
while self.workers < min(self.max, needSize):
self.startAWorker()
# 启动线程
def startAWorker(self):
# 控制启动方法中启动线程数量
self.workers += 1
name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
# 启动 self._woker 核心方法
newThread = self.threadFactory(target=self._worker, name=name)
newThread.start()
# 将任务放入队列
def callInThread(self, func, *args, **kw):
self.callInThreadWithCallback(None, func, *args, **kw)
def callInThreadWithCallback(self, onResult, func, *args, **kw):
o = (func, args, kw, onResult)
self.q.put(o)
# 上下文管理器
@contextlib.contextmanager
def _workerState(self, stateList, workerThread):
# self.working, currentThread
stateList.append(workerThread)
try:
# 生成器(协程)标志
yield
finally:
stateList.remove(workerThread)
#
def _worker(self):
# 自定义的 staticmethod(threading.currentThread)
ct = self.currentThread()
# 从队列中获取一组数据
o = self.q.get()
# 如果可迭代(数据不为空)
while isinstance(o, Iterable):
with self._workerState(self.working, ct):
# 元素解包
function, args, kwargs, onResult = o
# 删除 o 对象
del o
# 尝试运行程序
try:
result = function(*args, **kwargs)
success = True
except:
success = False
del function, args, kwargs
# 暂时看不懂这一步想干嘛
if onResult is not None:
try:
onResult(success, result)
except:
# context.call(ctx, log.err)
pass
del onResult, result
# 再取出一个 o 对象,实现队列不空,线程不结束
with self._workerState(self.waiters, ct):
o = self.q.get()
def stop(self):
while self.workers:
self.q.put(WorkerStop)
self.workers -= 1
def show(arg):
import time
time.sleep(1)
print(arg)
if __name__ == "__main__":
# 声明一个线程池对象 线程池设置数量设置为20
pool = ThreadPool(20)
# 先将所有对象放入线程池队列
for i in range(50):
pool.callInThread(show, i)
# 启动线程池
pool.start()
# 关闭线程池
pool.stop()
这个程序的逻辑是声明一个线程池类,调用start
方法中会启动startAWorker
再启动20条线程,然后便会进入_worker
中。每一个_worker
就是一条线程,队列不空,线程不结束。