协程与 子例程一样,协程(coroutine)也是一种程序组件。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。协程源自 Simula 和 Modula-2 语言,但也有其他语言支持。
看完之后,我的表情是这样的:
用专业词汇解释专业词汇,相当于没说,百度百科一惯如此。
再后来,因为工作圈子的问题,就没有再了解过协程相关的知识。在我之后写爬虫的这一年多时间里,我查看scrapy源码时试图查看twisted的源码,但感觉太难直接放弃。这可能是我最早的一次尝试查看异步代码。
到了2017年,听说了asyncio这个包。我认为我这个人有时候很不理性,总以为自己很屌能似的在对框架整体都没啥概念时就去查看别人写的源码,所以撞南墙也是我自己活该。asyncio貌似部分代码使用c实现,对于我这种半路出家的卢瑟遇到c相关的东西直接gg。所以再次放弃。
如果非要说我的协程启蒙教育是从什么时候开始的,我会说应该是在我读了<<流畅的python>>这本书之后。里面有一章关于协程的讲解,随后还讲解了asyncio,受益匪浅。
当时让我明白了什么叫委派生成器,为什么一堆生成器套在一起就算是一个协程应用了。但是我始终存在这几个盲点,关于sleep以及真正的io阻塞是怎么实现的。对于io这一块我知道底层肯定是使用了io多路复用。但是究竟是怎么借助一个事件循环让其跑起来的呢?我百撕不得骑姐。
之后,也就是前两天吧,我闲的蛋疼,不知道触碰到哪根神经了,突然想看tornado的源码。这个大名鼎鼎的web异步框架,据说是使用了协程实现。等等,为什么用“据说”二字?那是因为在此之前我的电脑从来没有运行过pip install tornado。作为一个爬虫专(cai)家(niao),我不否认我之前看过很多框架的源码,甚至包括一些web框架。什么django,bottle,flask,但是因为平时工作中我很少写web,所以看过就忘了。实践是检验真理的唯一标准,在web这方面,我最缺乏的就是实践。
没有使用tornado写过一行代码的我,一上手就直接开始看tonado源码。
下面先简单介绍一下tornado里面的协程实现。这一部分不想看的可以不看,直接跳到重点,tornado为了兼容多个python版本,实现起来可能比较复杂。
事件循环
tornado中的事件循环,本质上使用io多路复用实现。io多路复用的最主要作用就是为了唤醒事件循环。事件循环可以通过监听文件描述符来唤醒,所以tornado就自己创建一个管道。监听管道输出(READ),并设置了超时时间,此时epoll会在超时时间内挂起,如果有callback加入。则通过向管道写入任意字节唤醒,这样就相当于实现了一个可唤醒的阻塞。唤醒之后,会依次执行callback。然后执行timeouts中的callback,timeouts使用堆来保证时间最近的在最上面。
以一个简单的调用为例来说明
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
@coroutine
def asyn_sum(a, b):
print("begin calculate:sum %d+%d"%(a,b))
future = Future()
def callback(a, b):
print("calculating the sum of %d+%d:"%(a,b))
future.set_result(a+b)
tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
result = yield future
print("after yielded")
print("the %d+%d=%d"%(a, b, result))
def main():
future = asyn_sum(2,3)
tornado.ioloop.IOLoop.instance().start()
main()
程序开始,定义了一个asyn_sum协程,在main函数中第一行调用了该协程
同时tornado.gen.coroutine中调用asyn_sum生成器,获取第一个yield值也就是个future,并创建了一个Runner对象
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result) # 这一行所示
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(_value_from_stopiteration(e))
except Exception:
future.set_exc_info(sys.exc_info())
else:
_futures_to_runners[future] = Runner(result, future, yielded)
Runner通过__init__调用了其方法handle_yield,并检查future的完成情况,很明显在第一个代码块中`tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)`刚被加入事件循环,此时事件循环还未启动,所以callback中的`future.set_result(a+b)`并未被调用,因此future并未done。
# handle_yield
if not self.future.done() or self.future is moment:
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
self.io_loop.add_future(
self.future, inner)
return False
return True
让我们把注意力移到`self.io_loop.add_future(self.future, inner)`这么代码中,这行代码将inner在future完成之后加入到了事件循环的callbacks中,由下面的代码可以的看出
def add_future(self, future, callback):
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
The callback is invoked with one argument, the
`.Future`.
"""
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
future在开启事件循环后马上就会完成,因此,随后就会调用`self.add_callback(callback, future)`这段代码将inner加入事件循环,继而调用了self.run()
run会获取future的结果,同时发送结果给协程里的生成器
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info)
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
else:
yielded = self.gen.send(value)
相应的,结果就被赋值给了main中的`result = yield future`,继而结束了整个协程的调用。
future对象
future对象可以当成一次异步调用的代理人,异步调用在创建后,加入事件循环中。随着事件循环进行,异步调用被执行了,执行一结束,代理人马上获取结果,并置done=True,同时代理人会将异步调用后的全部回调函数执行,其中的一个回调函数就包括Runner.run,其作用是将异步调用结果赋值给yield左边的变量,同时继续执行接下来的代码,直到下一个yield出现。
对嵌套协程的调用
首先要清楚子协程返回的也一个future对象,因为这个future还没有完成,返回对于yield左边的变量会阻塞住。当子协程完成时,通过raise Return或者StopIteration的方式通过一个异常来将结果值传递出来,并为result_future也就是子协程的future进行set_result,子协程future完成后,父协程继续。
总结
对于`a = yield b`这种结构。b是一个Future,那么a的最终结果值为Future.result(),对于b,若是一个生成器,将其变成协程的方法只是实现了操控生成器在返回一个Future对象的同时,将生成器中每次yield都变成异步执行。
以上是tornado整个事件循环以及调用的大致流程,因为tornado被要求兼容python2,而python2没有实现yield from,所以tornado的机制和asyncio的实现机制会不相同。
所以我就想,我能否使用yield from 实现一个事件循环继而实现一个协程呢。
重点来了
我对tornado的事件循环以及Future对象进行了一些改造,基于python3.3之后的yield from 实现了一个简单的事件循环及相关方法。算是一个小的协程demo框架吧。
代码参见:
https://github.com/ShichaoMa/toolkit/blob/master/toolkit/event_loop.pygithub.com
设计思路
EventLoop和torando的IOLoop一样,使用IO多路复用来阻塞循环,使用一个Waker来唤醒EventLoop,这点我是照抄的torando,不同之处在于EventLoop中没有回调函数列表,而是拥有一个协程字典。
协程字典是由一堆协程对象作为key组成的。协程类继承于Torando::Future,本质上协程对象也是一个Future对象,不过更进一步的是,他同时还是一个生成器代理。key对应的value为每次yield返回的值。
协程类是通过一个装饰器创建的,代码如下:
def coroutine(func):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__()
self.gen = func(*args, **kwargs)
def send(self, result):
try:
return self.gen.send(result)
except (GeneratorExit, StopIteration) as e:
self.set_result(e.args[0])
raise
def throw(self, typ, val=None, tb=None):
return self.gen.throw(typ, val=None, tb=None)
def __iter__(self):
return self.gen
def __await__(self):
yield from self
cls = type(func.__name__,
(Future, Coroutine, Iterable),
{"__init__": __init__,
"send": send,
"throw": throw,
"__iter__": __iter__,
"__await__": __await__})
return cls
和当IOLoop一样,EventLoop开始事件循环后,每当有协程对象加入到事件循环中,都会被唤醒,执行循环,循环会处理监听到的文件描述符,调用其handler,处理timout对象。接下来处理的协程字典中的协程是IOLoop所没有的操作。
处理协程的方法如下:
def process_coroutine(self, coroutine, yielded):
while True:
try:
# 如果当前yield值为Future对象,则判断Future是否完成,进而调用send方法发结果,否则中断本协程的进行。
# 如果当前yield值不是Future对象, 直接send当前值。
# 如果期间发生协程运行完毕,则删除该协程。
if isinstance(yielded, Future):
if yielded.done():
yielded = self.coroutines[coroutine] = \
coroutine.send(yielded.result())
else:
break
else:
yielded = self.coroutines[coroutine] = coroutine.send(yielded)
except (GeneratorExit, StopIteration):
del self.coroutines[coroutine]
break
为什么要这么实现呢?
假设我们现在拥有一个简单的协程
demo0
# 通过coroutine装饰器,该协程会被变成一个协程子类。
@coroutine
def normal():
a = yield 1
print("Get a: %s" % a)
b = yield 2
print("Get b: %s" % b)
c = yield 3
print("Get c: %s" % c)
return c
coroutine0 = normal() # 创建一个协程对象
loop = EventLoop() # 创建事件循环
loop.run_until_complete(coroutine0)# 开启事件循环直到完成
print(coroutine0.result()) # 打印输出
以上代码大致会经过如下的操作:
- 创建一个协程对象
- 创建事件循环
- 将协程对象添加到协程字典中
- 开启事件循环
- 事件循环运行到process_coroutine,此时yielded在process_coroutine中由倒数第4行代码依次变成1, 2, 3(因为协程中的yield返回的不是future类型),并通过send方法赋值给了a, b, c。直到最后抛出StopIteration。并删除了该协程
- 运行结束result方法可以调用了返回return值
通过这个简单的协程我们大致了解了整个程序的运行流程。但这种协程并没有什么卵用,而且把一件简单的事情弄的非常复杂。接下来再更进一步介绍一个相对复杂的协程。
我们先看一个阻塞函数:
def sleep(seconds):
future = Future()
def callback():
future.set_result(None)
EventLoop().call_at(time.time() + seconds, callback)
yield future
这个函数和tornado的sleep函数差不多,函数创建了一个future对象,并在EventLoop中添加了一个Timeout对象,Timeout对象会在指定时间后后被事件循环调用其callback。在这里,被callback之后,future被set_result,也就是说Future.done()变为了True(记住这一点,后面会用到,很重要)。
下面再写一个复杂一点的的协程
demo1
@coroutine
def sum(a, b):
print("Sum start. %s + %s" % (a, b))
yield from sleep(1)
result = yield a + b
print("Sum stop. %s + %s" % (a, b))
return result
coroutine1 = sum(1, 2) # 创建一个协程对象
loop = EventLoop() # 创建事件循环
loop.run_until_complete(coroutine1)# 开启事件循环直到完成
print(coroutine1.result()) # 打印输出
以上代码大致会经过如下的操作:
- 创建一个协程对象
- 创建事件循环
- 将协程对象添加到协程字典中
- 开启事件循环
- 事件循环运行到process_coroutine,sleep返回一个Future对象,并且调用其done方法发现没有完成。于是跳出循环,回到事件循环。
- 事件循环会获取timeouts中所有Timeout对象的最小延迟时长并做为超时时长挂起。这时候整个世界都停止了。
- 等超时之后,处理timeouts中Timeout对象,发现sleep结束了,调用其callback,future对象这时被置为完成。
- 事件循环继续前进到达process_coroutine,此时调用future的done方法发现完成了。send future.result(),继而执行a + b。随后的情况和第一个例子无异了。
在这个例子中我们定义了一个sleep子协程,并学习到了如何实现阻塞。
再来看第三个更复杂的例子
demo2
@coroutine
def sum(a, b):
print("Sum start. %s + %s" % (a, b))
yield from sleep(1)
result = yield a + b
print("Sum stop. %s + %s" % (a, b))
return result
@coroutine
def multi(a, b):
print("Multi start. %s x %s" % (a, b))
yield from sleep(2)
result = yield a * b
print("Multi stop. %s x %s" % (a, b))
return result
@coroutine
def aaddbthenmutilc(a, b, c):
sum_result = yield from sum(a, b)
multi_result = yield from multi(sum_result, c)
return multi_result
coroutine2 = aaddbthenmutilc() # 创建一个协程对象
loop = EventLoop() # 创建事件循环
loop.run_until_complete(coroutine2)# 开启事件循环直到完成
print(coroutine2.result()) # 打印输出
刚才说了协程也是一个future,但协程本身还是一个生成器。所以当我们返回协程时,使用yield from 代替yield。
当使用yield from 时生成器和子生器之间就建立起了一个长长的管道,子生器生成的值可以直接返回,无论链接多少个生成器,就好像直接调用一个生成器一样。。
在这里coroutine2中调用了sum子协程。一但其结束,sum的值会被赋值给sum_result,coroutine2继续执行multi,直到结束。
写到现在,可能有同学会问了,如果是真正的IO阻塞,怎么处理。比如socket。别急,下面马上给出第4个例子。
在此之前,我又实现了一个类似于sleep的阻塞函数
def get_buffer(socket):
future = Future()
def callback(fd_obj, events):
future.set_result(fd_obj.recv(1024))
EventLoop().add_handler(socket, callback, EventLoop.READ)
buffer = yield future
return buffer
这个函数是专门用来处理socket读产生的阻塞的,创建一个future,并创建一个回调函数,添加到事件循环中,EventLoop中有一组对象被称为handlers,是用来处理文件描述符的。这个和tornado的IOLoop保持一致。通过add_handler可以添加。
在这里,socket被监听了READ事件。
demo3
@coroutine
def socket_coroutine():
print("socket_coroutine start. ")
import socket
from toolkit.event_loop import get_buffer
client = socket.socket()
client.connect(("", 1234))
buffer = yield from get_buffer(client)
client.close()
print("socket_coroutine stop. ")
return buffer
coroutine3 = socket_coroutine() # 创建一个协程对象
loop = EventLoop() # 创建事件循环
loop.run_until_complete(coroutine3)# 开启事件循环直到完成
print(coroutine3.result()) # 打印输出
同时我开启一个server
In [3]: def serve():
...: server = socket.socket()
...: server.bind(("", 1234))
...: server.listen(0)
...: client, addr = server.accept()
...: import time
...: time.sleep(5)
...: client.send(b"hello world")
...: client.close()
...: server.close()
...:
In [4]: serve()
这个例子和sleep差不多,不同之处在于sleep让事件循环停止了一段时间,而通过IO多路复用则是在被监听的文件描述符发生读事件时激活了事件循环。之后的步骤和demo1一样。
下面还有最后一个例子,就是多个协程一起运行。
demo4
不贴代码了,太长,代码在此!https://github.com/ShichaoMa/toolkit/blob/master/test/test_event_loop.py
里面有6个协程。
总结
其实关键的思路就在于如何处理协程生成器上,我的想法是对于一个协程,无非就是存在两种情况,阻塞与不阻塞,阻塞的情况是返回的是future同时done为False,这种情况就像sleep ,get_buffer一样。刚开始返回的都是没有完成的future,导致该生成器无法被继续执行,这个时候process_coroutine方法是会直接停止该生成器的执行,继续执行下一个生成器,生成timeouts对象和添加事件监听保证了在规定时间内(timout.callback)或者在事件发生时future被置为done=True。对于没有阻塞的情况(返回的是非Future或者Future已经完成),生成器会一直执行下去直到结束。
终于写完了。
希望能给您带一些编程灵感!这个事件循环的所有代码都被放在我的开发工具包toolkit里面了。可在通过 pip install toolkity==1.6.0安装