threadlocal与线程相关,每个线程都会有一份,参考 http://python.jobbole.com/86150/
ThreadPoolExecutor构造函数里面有max_workers参数,如果这个参数设置的不好,就有可能造成内存泄漏。
示例代码如下:
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import traceback
x = threading.local()
def show():
try:
print(f'{threading.get_ident()} size {len(x.content)}')
except:
print(traceback.format_exc())
def func():
print(f'{threading.current_thread()}')
with open(r'/path/to/bigfile', 'rb') as f:
x.content = f.read()
print(len(x.content))
show()
time.sleep(15)
return '1'
if __name__ == '__main__':
executer = ThreadPoolExecutor(max_workers=4)
for i in range(4):
f1 = executer.submit(func)
# print(f1.result())
time.sleep(35)
print(' in main')
输出
通过top查看使用内存(为49.8)
把max_workers设置为1,
输出
通过top查看使用内存(为12.5)
如果改为用thread运行,也不会泄漏,
if __name__ == '__main__':
# executer = ThreadPoolExecutor(max_workers=4)
for i in range(4):
# f1 = executer.submit(func)
th = threading.Thread(target=func)
th.start()
th.join()
# print(f1.result())
time.sleep(35)
print(' in main')
应该是ThreadPoolExecutor里面thread执行完之后,thread没有 释放造成.
查看ThreadPoolExecutor代码
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
发现有self._threads.add(t), 应该就是这个造成的。