问题描述:
一个简单的生产者消费者模型,假设每0.5秒生产一个数据,而花费1秒消费一个数据。
0x01 多线程之一
一个生产者线程、一个消费者线程
import threading
import time
import queue
# 全局线程锁
lock =threading.Lock()
# 生产对象
def produce(q,n):
for x in range(n):
print('producing {}/{}'.format(x,n))
time.sleep(0.5)
item = str(x)
q.put(item)
# quit表示生产结束
q.put('quit')
# 消费对象
def consume(q):
while True:
# 从队列中获取数据要加锁
lock.acquire()
item = q.get()
lock.release()
print(item)
# 结束标志
if item =='quit':
break
# do something work
print('consuming {}...'.format(item))
time.sleep(1)
if __name__ == '__main__':
t = time.time()
q = queue.Queue()
# 开启生产者线程,生产10个对象
t1 = threading.Thread(target=produce,args=(q,10))
t1.start()
# 开启消费者线程
t2 = threading.Thread(target=consume,args=(q,))
t2.start()
t1.join()
t2.join()
print(time.time()-t)
此时虽然不会阻塞,消费10个对象最低还是要用10+s,其中消费时间还是很长,还有再优化的空间。
0x02 多线程之二
一个生产者,两个消费者
import threading
import time
import queue
# 全局线程锁
lock = threading.Lock()
# 生产对象
def produce(q, n):
for x in range(n):
print('producing {}/{}'.format(x, n))
time.sleep(0.5)
item = str(x)
q.put(item)
# quit表示生产结束
q.put('quit')
q.put('quit')
# 消费对象
def consume(q):
while True:
# 从队列中获取数据要加锁
lock.acquire()
item = q.get()
lock.release()
print(item)
# 结束标志
if item == 'quit':
break
# do something work
print('consuming {}...'.format(item))
time.sleep(1)
if __name__ == '__main__':
t = time.time()
q = queue.Queue()
# 开启生产者线程,生产10个对象
t1 = threading.Thread(target=produce, args=(q, 10))
t1.start()
# 开启消费者线程
t2 = threading.Thread(target=consume, args=(q,))
t2.start()
t3 = threading.Thread(target=consume, args=(q,))
t3.start()
t1.join()
t2.join()
t3.join()
print(time.time() - t)
因为在第一个线程消费的过程中,第二个线程也会进行消费,此时运行时间已经缩短到6+s
0x03 协程
尽管使用多线程确实降低了时间,但是由于Python中的全局锁,始终都是以一个线程在工作,只是在阻塞时中断程序继而去执行其他线程而已。而且在实际应用中,如果存在很多共享操作,就必须要上锁,这是一个很麻烦的事情。Python3中引入协程的概念,其实协程也是线程的一种,我们可以使用asyncio包,由系统自行进行中断处理,大大的提高了开发效率。
import asyncio
import time
async def produce(queue,n):
for x in range(1,n+1):
print('producing{}/{}'.format(x,n))
await asyncio.sleep(0.5)
item = str(x)
await queue.put(item)
await queue.put(None)
await queue.put(None)
async def consume(queue):
while True:
item = await queue.get()
if item is None:
break
print('consuming item {} ...'.format(item))
await asyncio.sleep(1)
if __name__ == '__main__':
t= time.time()
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
produce_coro = produce(queue,10)
consumer_coro1 = consume(queue)
consumer_coro2 = consume(queue)
tasks =[produce_coro,consumer_coro1,consumer_coro2]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(time.time()-t)
此时时间已经降低到6秒左右。
0x04 多进程
既然Python中由于全局GIL锁,那看看多进程的表现如何:
import multiprocessing
import time
lock = multiprocessing.Lock()
def produce(q,n):
for x in range(n):
print('producing {}/{}'.format(x,n))
time.sleep(0.5)
item = str(x)
q.put(item)
q.put('quit')
q.put('quit')
def consume(q):
while True:
lock.acquire()
item = q.get()
lock.release()
if item == 'quit':
break
print('consuming {}...'.format(item))
time.sleep(1)
if __name__ == '__main__':
t = time.time()
q = multiprocessing.Queue()
t1 = multiprocessing.Process(target=produce,args=(q,10))
t2 = multiprocessing.Process(target=consume,args=(q,))
t3 = multiprocessing.Process(target=consume,args=(q,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(time.time()-t)
运行时间与上述几乎一致,两个消费者,时间控制在6秒左右,并没有发挥出太大的优势。
0x05 多进程+协程
无论增加多少线程或者进程,由于生产效率的问题,时间至少也要5秒之上,除非多个生产者同时开始,这时我们使用最终的加速方案,多进程与协程,将生产者消费者绑定到一起,并且共享一个消费队列。
这里还是以生产10个对象为例,不过我们将其分给给两个进程去运行,每个进程分配一个生产者,两个消费者,代码如下:
import asyncio
import time
import multiprocessing as mp
# 共享队列
queue = asyncio.Queue()
async def produce(start,end):
for x in range(start,end):
print('producing{}/{}'.format(x,end))
await asyncio.sleep(0.5)
item = str(x)
await queue.put(item)
await queue.put('quit')
await queue.put('quit')
async def consume():
while True:
item = await queue.get()
if item is 'quit':
break
print('consuming item {} ...'.format(item))
await asyncio.sleep(1)
def process(start,end):
tasks = []
loop = asyncio.get_event_loop()
# 一个生产者
tasks.append(asyncio.ensure_future(produce(start,end)))
# 两个消费者
for i in range(2):
tasks.append(asyncio.ensure_future(consume()))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
if __name__ == '__main__':
t= time.time()
# 创建进程池
pool = mp.Pool(2)
for i in range(2):
pool.apply_async(process,(i*5,(i+1)*5,))
pool.close()
pool.join()
print(time.time()-t)
此时时间已经缩短到3.7秒左右,当然我们也可以使用多线程生产,然后多线程消费,就本质上并没有真正的提升运行效率。
后记
其实这是我一个项目中的架构雏形,我只是把他抽象出来。运用多进程与协程的结合使得程序的运行时间从原来的一个小时缩短至三分钟左右,其效果特别明显。但是弊端也开始暴露出来,这种方式极其不稳定,经常会有莫名的bug使得程序不会执行多进程,甚至改一点小小的东西都会触发异常。
综上,真正的大型处理框架中,Python并不能很好的胜任这项工作,建议使用其他更加稳定的语言。