简介
Python 脚本执行的时候不是很快,特别是 Python 下面的多线程机制,长久以来一直被大家所诟病。通常来说要让 Python 执行效率变高一般使用的方法包括:
- 将复杂的代码转由 C 等执行效率更高的语言完成
- 多进程并发执行
- 用多线程完成 IO 操作
- 使用 gevent 协程机制
本篇博客将简单介绍一下协程。
协程的基本原理
gevent 的基本原理来自于 libevent&libev。本质上 libevent 或者说 libev 都是一种事件驱动模型。这种模型对于提高 CPU 的运行效率,增强用户的并发访问非常有效。但是因为它本身是一种事件机制,所以写起来有点绕,不是很直观。所以,为了修正这个问题,有心人引入了用户侧上下文切换的机制。这就是说,如果代码中引入了带 IO 阻塞的代码时,lib 本身会自动完成上下文的切换,全程用户都是没有觉察的。这就是 gevent 的由来。
gevent 是一个第三方库,可以轻松通过 gevent 实现并发同步或异步编程。
使用
g = gevent.spawn(func,1,……,x=3,……)
创建一个协程对象 g ,spawn 括号内第一个参数是函数名,后面可以有多个参数(位置参数,关键字参数)。
g.join() # 等待结束
g.value # 拿到func的返回值。
实例1:
import gevent
def func1():
print('func1 begin')
gevent.sleep(3)
print('func1 end')
def func2():
print('func2 begin')
gevent.sleep(2)
print('func2 end')
def func3():
print('func3 begin')
gevent.sleep(2)
print('func3 end')
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g3 = gevent.spawn(func3)
gevent.joinall([g1, g2, g3])
运行:
略
上面的例子中 gevent.sleep(2) 是模拟 gevent 中可以识别的 IO 阻塞,遇到IO阻塞时会自动切换任务。
延时操作
而 time.sleep() 或其它的阻塞,gevent 是不能直接识别的,需要下面一行代码,打补丁识别。
from gevent import monkey;mokey.patch_all()
这一行代码必需放在打补丁者的最前面。
实例2:
from gevent import monkey, spawn;monkey.patch_all()
import time
def func1():
print('func1 begin')
time.sleep(3)
print('func1 end')
def func2():
print('func2 begin')
time.sleep(2)
print('func2 end')
def func3():
print('func3 begin')
time.sleep(2)
print('func3 end')
g1 = spawn(func1)
g2 = spawn(func2)
g3 = spawn(func3)
g1.join()
g2.join()
g3.join()
锁操作
虽然是协程,但是在里面添加锁增加对共享资源的互斥访问也是非常重要的,此外锁本身的添加也是很简单的。
import gevent
from gevent.lock import Semaphore
import time
sem = Semaphore(1)
def f1():
for i in range(5):
sem.acquire()
print('this is ' + str(i))
time.sleep(2)
sem.release()
def f2():
for i in range(5):
sem.acquire()
print('that is ' + str(i))
sem.release()
if __name__ == '__main__':
t1 = gevent.spawn(f1)
t2 = gevent.spawn(f2)
gevent.joinall([t1, t2])
运行:
略
使用锁可以防止其中之间的互斥访问。
案例
如何通过 gevent 来实现一个单线程下的socket并发?
服务端:
from gevent import monkey; monkey.patch_all()
from socket import *
import gevent
def server(server_ip, port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((server_ip, port))
s.listen(5)
while True:
conn, addr = s.accept()
gevent.spawn(talk, conn, addr)
def talk(conn, addr):
try:
while True:
res = conn.recv(1024)
print('Client %s: %s msg: %s' % (addr[0], addr[1], res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1', 8080)
客户端:
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
c = socket(AF_INET, SOCK_STREAM)
c.connect((server_ip, port))
count = 0
while True:
c.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
msg = c.recv(1024)
print(msg.decode('utf-8'))
count += 1
if __name__ == '__main__':
for i in range(100):
t = Thread(target=client, args=('127.0.0.1', 8080))
t.start()
运行:先执行客户端,再执行服务端
略
查看源码:https://github.com/vadonical/python/tree/master/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/gevent_demo