python协程中我们介绍了从yield生成器到协程,通过send()函数可以给yeild发消息,让其知道什么时候执行结束,可以切换任务了。
yield可以简单传一个value,如果抛出异常如何处理呢,yield from 结构会在内部自动捕获 StopIteration 异常。对 yield from 结构来说,解释器不仅会捕获 StopIteration 异常,还会把value 属性的值变成 yield from 表达式的值。
一、yield from
yield from用于重构生成器,简单的,可以这么使用:
def copy_fib(n):
print('I am copy from fib')
yield from fib(n)
print('Copy end')
print('-'*10 + 'test yield from' + '-'*10)
for fib_res in copy_fib(20):
print(fib_res)
这种使用方式很简单,但远远不是yield from的全部。yield from的作用还体现可以像一个管道一样将send信息传递给内层协程,并且处理好了各种异常情况,因此,对于stupid_fib也可以这样包装和使用:
def copy_stupid_fib(n):
print('I am copy from stupid fib')
yield from stupid_fib(n)
print('Copy end')
print('-'*10 + 'test yield from and send' + '-'*10)
N = 20
csfib = copy_stupid_fib(N)
fib_res = next(csfib)
while True:
print(fib_res)
try:
fib_res = csfib.send(random.uniform(0, 0.5))
except StopIteration:
break
如果没有yield from,这里的copy_yield_from将会特别复杂(因为要自己处理各种异常)。
目前python中实现协程的greenlet,gevent模块其实都是基于I/O多路复用的,真正的异步I/O在python3.x中asyncio模块可以实现
二、asyncio模块
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。用于异步网络操作、并发、协程。
asyncio
的编程模型就是一个消息循环。我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。
关于asyncio的一些关键字的说明:
-
event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
-
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
-
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
-
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别
-
async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
用asyncio
实现Hello world
代码如下:
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
@asyncio.coroutine
把一个generator标记为coroutine类型,然后,我们就把这个coroutine
扔到EventLoop
中执行。
hello()
会首先打印出Hello world!
,然后,yield from
语法可以让我们方便地调用另一个generator
。由于asyncio.sleep()
也是一个coroutine
,所以线程不会等待asyncio.sleep()
,而是直接中断并执行下一个消息循环。当asyncio.sleep()
返回时,线程就可以从yield from
拿到返回值(此处是None
),然后接着执行下一行语句。
把asyncio.sleep(1)
看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop
中其他可以执行的coroutine
了,因此可以实现并发执行。
我们用Task封装两个coroutine
试试:
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
观察执行过程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暂停约1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的当前线程名称可以看出,两个coroutine
是由同一个线程并发执行的。
如果把asyncio.sleep()
换成真正的IO操作,则多个coroutine
就可以由一个线程并发执行。
我们用asyncio
的异步网络连接来获取sina、sohu和163的网站首页:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
执行结果如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段时间)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可见3个连接由一个线程通过coroutine
并发完成。
asyncio.coroutine和yield from
yield from在asyncio模块中得以发扬光大。先看示例代码:
@asyncio.coroutine
def smart_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.2)
yield from asyncio.sleep(sleep_secs)
print('Smart one think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1
@asyncio.coroutine
def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.4)
yield from asyncio.sleep(sleep_secs)
print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(smart_fib(10)),
asyncio.async(stupid_fib(10)),
]
loop.run_until_complete(asyncio.wait(tasks))
print('All fib finished.')
loop.close()
asyncio是一个基于事件循环的实现异步I/O的模块。通过yield from,我们可以将协程asyncio.sleep的控制权交给事件循环,然后挂起当前协程;之后,由事件循环决定何时(数据从内核空间已经拷贝到用户空间)唤醒asyncio.sleep,接着向后执行代码。
这样说可能比较抽象,好在asyncio是一个由python实现的模块,那么我们来看看asyncio.sleep中都做了些什么:
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
future = futures.Future(loop=loop)
h = future._loop.call_later(delay,
future._set_result_unless_cancelled, result)
try:
return (yield from future)
finally:
h.cancel()
首先,sleep创建了一个Future对象,作为更内层的协程对象,通过yield from交给了事件循环;其次,它通过调用事件循环的call_later函数,注册了一个回调函数。
通过查看Future类的源码,可以看到,Future是一个实现了__iter__对象的生成器:
class Future:
#blabla...
def __iter__(self):
if not self.done():
self._blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
那么当我们的协程yield from asyncio.sleep时,事件循环其实是与Future对象建立了练习。每次事件循环调用send(None)时,其实都会传递到Future对象的__iter__函数调用;而当Future尚未执行完毕的时候,就会yield self,也就意味着暂时挂起,等待下一次send(None)的唤醒。
当我们包装一个Future对象产生一个Task对象时,在Task对象初始化中,就会调用Future的send(None),并且为Future设置好回调函数。
class Task(futures.Future):
#blabla...
def _step(self, value=None, exc=None):
#blabla...
try:
if exc is not None:
result = coro.throw(exc)
elif value is not None:
result = coro.send(value)
else:
result = next(coro)
#exception handle
else:
if isinstance(result, futures.Future):
# Yielded Future must come from Future.__iter__().
if result._blocking:
result._blocking = False
result.add_done_callback(self._wakeup)
#blabla...
def _wakeup(self, future):
try:
value = future.result()
except Exception as exc:
# This may also be a cancellation.
self._step(None, exc)
else:
self._step(value, None)
self = None # Needed to break cycles when an exception occurs.
预设的时间过后,事件循环将调用Future._set_result_unless_cancelled:
class Future:
#blabla...
def _set_result_unless_cancelled(self, result):
"""Helper setting the result only if the future was not cancelled."""
if self.cancelled():
return
self.set_result(result)
def set_result(self, result):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()
这将改变Future的状态,同时回调之前设定好的Tasks._wakeup;在_wakeup中,将会再次调用Tasks._step,这时,Future的状态已经标记为完成,因此,将不再yield self,而return语句将会触发一个StopIteration异常,此异常将会被Task._step捕获用于设置Task的结果。同时,整个yield from链条也将被唤醒,协程将继续往下执行。
async和await
弄清楚了asyncio.coroutine和yield from之后,在Python3.5中引入的async和await就不难理解了:可以将他们理解成asyncio.coroutine/yield from的完美替身。当然,从Python设计的角度来说,async/await让协程表面上独立于生成器而存在,将细节都隐藏于asyncio模块之下,语法更清晰明了。
async def smart_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.2)
await asyncio.sleep(sleep_secs)
print('Smart one think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1
async def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.4)
await asyncio.sleep(sleep_secs)
print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(smart_fib(10)),
asyncio.ensure_future(stupid_fib(10)),
]
loop.run_until_complete(asyncio.wait(tasks))
print('All fib finished.')
loop.close()
总结
至此,Python中的协程就介绍完毕了。示例程序中都是以sleep为异步I/O的代表,在实际项目中,可以使用协程异步的读写网络、读写文件、渲染界面等,而在等待协程完成的同时,CPU还可以进行其他的计算。协程的作用正在于此。
相关代码可以在GitHub上找到https://github.com/yubo1911/saber/tree/master/coroutine。
http://python.jobbole.com/87310/
三、Greenlet
一句话来说明greenlet的实现原理:通过栈的复制切换来实现不同协程之间的切换
greenlet可以实现手动切换,
我们首先看一下greenlet这个module里面的属性
>>> dir(greenlet)
['GREENLET_USE_GC', 'GREENLET_USE_TRACING', 'GreenletExit', '_C_API', '__doc__', '__file__', '__name__', '__package__', '__version__', 'error', 'getcurrent', 'gettrace', 'greenlet', 'settrace']
>>>
其中,比较重要的是getcurrent(), 类greenlet、异常类GreenletExit。
getcurrent()返回当前的greenlet实例;
GreenletExit:是一个特殊的异常,当触发了这个异常的时候,即使不处理,也不会抛到其parent(后面会提到协程中对返回值或者异常的处理)
然后我们再来看看greenlet.greenlet这个类:
>>> dir(greenlet.greenlet)
['GreenletExit', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getstate__', '__hash__', '__init__', '__new__', '__nonzero__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_stack_saved', 'dead', 'error', 'getcurrent', 'gettrace', 'gr_frame', 'parent', 'run', 'settrace','switch', 'throw']
>>>
比较重要的几个属性:
run:当greenlet启动的时候会调用到这个callable,如果我们需要继承greenlet.greenlet时,需要重写该方法
switch:前面已经介绍过了,在greenlet之间切换
parent:可读写属性,后面介绍
dead:如果greenlet执行结束,那么该属性为true
throw:切换到指定greenlet后立即跑出异常
文章后面提到的greenlet大多都是指greenlet.greenlet这个class,请注意区别
Switch not call
对于greenlet,最常用的写法是 x = gr.switch(y)。 这句话的意思是切换到gr,传入参数y。当从其他协程(不一定是这个gr)切换回来的时候,将值付给x。
import greenlet
def test1(x, y):
z = gr2.switch(x+y)
print('test1 ', z)
def test2(u):
print('test2 ', u)
gr1.switch(10)
gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print gr1.switch("hello", " world")
输出:
('test2 ', 'hello world')
('test1 ', 10)
None
上面的例子,第12行从main greenlet切换到了gr1,test1第3行切换到了gs2,然后gr1挂起,第8行从gr2切回gr1时,将值(10)返回值给了 z。
每一个Greenlet都有一个parent,一个新的greenlet在哪里创生,当前环境的greenlet就是这个新greenlet的parent。所有的greenlet构成一棵树,其根节点就是还没有手动创建greenlet时候的”main” greenlet(事实上,在首次import greenlet的时候实例化)。当一个协程 正常结束,执行流程回到其对应的parent;或者在一个协程中抛出未被捕获的异常,该异常也是传递到其parent。学习python的时候,有一句话会被无数次重复”everything is oblect”, 在学习greenlet的调用中,同样有一句话应该深刻理解, “switch not call”。
import greenlet
def test1(x, y):
print id(greenlet.getcurrent()), id(greenlet.getcurrent().parent) # 40240272 40239952
z = gr2.switch(x+y)
print 'back z', z
def test2(u):
print id(greenlet.getcurrent()), id(greenlet.getcurrent().parent) # 40240352 40239952
return 'hehe'
gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print id(greenlet.getcurrent()), id(gr1), id(gr2) # 40239952, 40240272, 40240352
print gr1.switch("hello", " world"), 'back to main' # hehe back to main
上述例子可以看到,尽量是从test1所在的协程gr1 切换到了gr2,但gr2的parent还是’main’ greenlet,因为默认的parent取决于greenlet的创生环境。另外 在test2中return之后整个返回值返回到了其parent,而不是switch到该协程的地方(即不是test1),这个跟我们平时的函数调用不一样,记住“switch not call”。对于异常 也是展开至parent
mport greenlet
def test1(x, y):
try:
z = gr2.switch(x+y)
except Exception:
print 'catch Exception in test1'
def test2(u):
assert False
gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
try:
gr1.switch("hello", " world")
except:
print 'catch Exception in main'
输出为:
catch Exception in main
Greenlet生命周期
文章开始的地方提到第一个例子中的gr2其实并没有正常结束,我们可以借用greenlet.dead这个属性来查看
from greenlet import greenlet
def test1():
gr2.switch(1)
print 'test1 finished'
def test2(x):
print 'test2 first', x
z = gr1.switch()
print 'test2 back', z
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
print 'gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead)
gr2.switch()
print 'gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead)
print gr2.switch(10)
输出:
test2 first 1
test1 finished
gr1 is dead?: True, gr2 is dead?: False
test2 back ()
gr1 is dead?: True, gr2 is dead?: True
10
从这个例子可以看出
- 只有当协程对应的函数执行完毕,协程才会die,所以第一次Check的时候gr2并没有die,因为第9行切换出去了就没切回来。在main中再switch到gr2的时候, 执行后面的逻辑,gr2 die
- 如果试图再次switch到一个已经是dead状态的greenlet会怎么样呢,事实上会切换到其parent greenlet。
Greenlet Traceing
Greenlet也提供了接口使得程序员可以监控greenlet的整个调度流程。主要是gettrace 和 settrace(callback)函数。下面看一个例子:
def test_greenlet_tracing():
def callback(event, args):
print event, 'from', id(args[0]), 'to', id(args[1])
def dummy():
g2.switch()
def dummyexception():
raise Exception('excep in coroutine')
main = greenlet.getcurrent()
g1 = greenlet.greenlet(dummy)
g2 = greenlet.greenlet(dummyexception)
print 'main id %s, gr1 id %s, gr2 id %s' % (id(main), id(g1), id(g2))
oldtrace = greenlet.settrace(callback)
try:
g1.switch()
except:
print 'Exception'
finally:
greenlet.settrace(oldtrace)
test_greenlet_tracing()
输出:
main id 40604416, gr1 id 40604736, gr2 id 40604816
switch from 40604416 to 40604736
switch from 40604736 to 40604816
throw from 40604816 to 40604416
Exception
其中callback函数event是switch或者throw之一,表明是正常调度还是异常跑出;args是二元组,表示是从协程args[0]切换到了协程args[1]。上面的输出展示了切换流程:从main到gr1,然后到gr2,最后回到main。
greenlet使用建议:
使用greenlet需要注意一下三点:
第一:greenlet创生之后,一定要结束,不能switch出去就不回来了,否则容易造成内存泄露
第二:python中每个线程都有自己的main greenlet及其对应的sub-greenlet ,不能线程之间的greenlet是不能相互切换的
第三:不能存在循环引用,这个是官方文档明确说明
”Greenlets do not participate in garbage collection; cycles involving data that is present in a greenlet’s frames will not be detected. “
对于第一点,我们来看一个例子:
from greenlet import greenlet, GreenletExit
huge = []
def show_leak():
def test1():
gr2.switch()
def test2():
huge.extend([x* x for x in range(100)])
gr1.switch()
print 'finish switch del huge'
del huge[:]
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
gr1 = gr2 = None
print 'length of huge is zero ? %s' % len(huge)
if __name__ == '__main__':
show_leak()
# output: length of huge is zero ? 100
在test2函数中 第11行,我们将huge清空,然后再第16行将gr1、gr2的引用计数降到了0。但运行结果告诉我们,第11行并没有执行,所以如果一个协程没有正常结束是很危险的,往往不符合程序员的预期。greenlet提供了解决这个问题的办法,官网文档提到:如果一个greenlet实例的引用计数变成0,那么会在上次挂起的地方抛出GreenletExit异常,这就使得我们可以通过try ... finally 处理资源泄露的情况。如下面的代码:
from greenlet import greenlet, GreenletExit
huge = []
def show_leak():
def test1():
gr2.switch()
def test2():
huge.extend([x* x for x in range(100)])
try:
gr1.switch()
finally:
print 'finish switch del huge'
del huge[:]
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
gr1 = gr2 = None
print 'length of huge is zero ? %s' % len(huge)
if __name__ == '__main__':
show_leak()
# output :
# finish switch del huge
# length of huge is zero ? 0
上述代码的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明显gr2没有正常结束(在第10行刮起了)。第18行之后gr1,gr2的引用计数都变成0,那么会在第10行抛出GreenletExit异常,因此finally语句有机会执行。同时,在文章开始介绍Greenlet module的时候也提到了,GreenletExit这个异常并不会抛出到parent,所以main greenlet也不会出异常。
看上去貌似解决了问题,但这对程序员要求太高了,百密一疏。所以最好的办法还是保证协程的正常结束。
之前的文章其实已经提到提到了coroutine协程的强大之处,对于异步非阻塞,而且还需要保留上下文的场景非常适用。greenlet跟强大,可以从一个协程切换到任意其他协程,这是generator做不到的,但这种能力其实也是双刃剑,前面的注意事项也提到了,必须保证greenlet的正常结束,在协程之间任意的切换很容易出问题。
比如对于服务之间异步请求的例子,简化为服务A的一个函数foo需要异步访问服务B,可以这样封装greenlet:用decorator装饰函数foo,当调用这个foo的时候建立一个greenlet实例,并为这个greenley对应一个唯一的gid,在foo方法发出异步请求(写到gid)之后,switch到parent,这个时候这个新的协程处于挂起状态。当请求返回之后,通过gid找到之前被挂起的协程,恢复该协程即可。More simple More safety,保证旨在main和一级子协程之间切换。需要注意的是处理各种异常 以及请求超时的情况,避免内存泄露,gvent对greenlet的使用大致也是这样的。
四、gevent
gevent可以实现自动切换,而且有很好用的monkey
gevent,它是一个并发网络库。它的协程是基于greenlet的,并基于libev实现快速事件循环(Linux上是epoll,FreeBSD上是kqueue,Mac OS X上是select)。有了gevent,协程的使用将无比简单,你根本无须像greenlet一样显式的切换,每当一个协程阻塞时,程序将自动调度,gevent处理了所有的底层细节。让我们看个例子来感受下吧。
import gevent
def test1():
print 12
gevent.sleep(0)
print 34
def test2():
print 56
gevent.sleep(0)
print 78
gevent.joinall([
gevent.spawn(test1),
gevent.spawn(test2),
])
解释下,”gevent.spawn()”方法会创建一个新的greenlet协程对象,并运行它。”gevent.joinall()”方法会等待所有传入的greenlet协程运行结束后再退出,这个方法可以接受一个”timeout”参数来设置超时时间,单位是秒。运行上面的程序,执行顺序如下:
- 先进入协程test1,打印12
- 遇到”gevent.sleep(0)”时,test1被阻塞,自动切换到协程test2,打印56
- 之后test2被阻塞,这时test1阻塞已结束,自动切换回test1,打印34
- 当test1运行完毕返回后,此时test2阻塞已结束,再自动切换回test2,打印78
- 所有协程执行完毕,程序退出
所以,程序运行下来的输出就是:
12 56 34 78
注意,这里与上一篇greenlet中第一个例子运行的结果不一样,greenlet一个协程运行完后,必须显式切换,不然会返回其父协程。而在gevent中,一个协程运行完后,它会自动调度那些未完成的协程。
我们换一个更有意义的例子:
1 2 3 4 5 6 7 8 |
import gevent import socket urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=5) print [job.value for job in jobs] |
import gevent,socket
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
print [job.value for job in jobs]
我们通过协程分别获取三个网站的IP地址,由于打开远程地址会引起IO阻塞,所以gevent会自动调度不同的协程。另外,我们可以通过协程对象的”value”属性,来获取协程函数的返回值。
猴子补丁 Monkey patching
在运行上面例子时会发现,其实程序运行的时间同不用协程是一样的,是三个网站打开时间的总和。可是理论上协程是非阻塞的,那运行时间应该等于最长的那个网站打开时间呀?其实这是因为Python标准库里的socket是阻塞式的,DNS解析无法并发,包括像urllib库也一样,所以这种情况下用协程完全没意义。那怎么办?
一种方法是使用gevent下的socket模块,我们可以通过”from gevent import socket”来导入。不过更常用的方法是使用猴子布丁(Monkey patching):
from gevent import monkey; monkey.patch_socket()
import gevent
import socket
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
print [job.value for job in jobs]
上述代码的第一行就是对socket标准库打上猴子补丁,此后socket标准库中的类和方法都会被替换成非阻塞式的,所有其他的代码都不用修改,这样协程的效率就真正体现出来了。Python中其它标准库也存在阻塞的情况,gevent提供了”monkey.patch_all()”方法将所有标准库都替换。
from gevent import monkey; monkey.patch_all()
官网建议使用”patch_all()”,而且在程序的第一行就执行。
获取协程状态
协程状态有已启动和已停止,分别可以用协程对象的”started”属性和”ready()”方法来判断。对于已停止的协程,可以用”successful()”方法来判断其是否成功运行且没抛异常。如果协程执行完有返回值,可以通过”value”属性来获取。另外,greenlet协程运行过程中发生的异常是不会被抛出到协程外的,因此需要用协程对象的”exception”属性来获取协程中的异常。下面的例子很好的演示了各种方法和属性的使用。
#coding:utf8
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You failed!')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print winner.started # True
print loser.started # True
# 在Greenlet中发生的异常,不会被抛到Greenlet外面。
# 控制台会打出Stacktrace,但程序不会停止
try:
gevent.joinall([winner, loser])
except Exception as e:
# 这段永远不会被执行
print 'This will never be reached'
print winner.ready() # True
print loser.ready() # True
print winner.value # 'You win!'
print loser.value # None
print winner.successful() # True
print loser.successful() # False
# 这里可以通过raise loser.exception 或 loser.get()
# 来将协程中的异常抛出
print loser.exception
协程运行超时
之前我们讲过在”gevent.joinall()”方法中可以传入timeout参数来设置超时,我们也可以在全局范围内设置超时时间:
import gevent
from gevent import Timeout
timeout = Timeout(2) # 2 seconds
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
上例中,我们将超时设为2秒,此后所有协程的运行,如果超过两秒就会抛出”Timeout”异常。我们也可以将超时设置在with语句内,这样该设置只在with语句块中有效:
class TooLong(Exception):
pass
with Timeout(1, TooLong):
gevent.sleep(10)
此外,我们可以指定超时所抛出的异常,来替换默认的”Timeout”异常。比如下例中超时就会抛出我们自定义的”TooLong”异常。
此外这里也可以了解http://www.maiziedu.com/course/747/