协程,又被称为微线程,一个线程可以拥有多个协程。协程在执行过程中,可以进行中断,然后转区执行别的子程序,在适当的时候在返回来接着执行,注意:这个过程是在一个子程序中中断,去执行其他的子程序,不是函数调用,有点类似于CPU的中断。
和多线程相比,协程最大的优势就是执行效率高,因为子程序切换不是线程的切换,是由程序自身进行控制的,因此没有,线程切换的开销,和多线程相比,线程的数量越多,协程的性能优势就越发明显。第二打油诗,就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量的冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率就比多线程高了好多。
我们利用多核cpu最好的方式就是多进程+协程。
在python中yeild关键字在函数中,可以使函数变为一个生成器,生成器让带吗在执行过程中可以暂停,也可以将值传递给暂停的生成器,这使得python中的协程概念成为了可能,再加上python3.3之后的yeild from,使得重构生成器与将它们串联起来都很简单。
一、快速创建协程
#monkey ,gevent能够修改标准库里面大部分的阻塞式系统调用,
#包括socket、ssl、threading和 select等模块,而变为协作式运行。
from gevent import monkey
import gevent
import threading
monkey.patch_socket()
def f(n):
#协程处理的任务
for i in range(n):
#gevent.getcurrent() 返回当前协程的对象
#threading.current_thread().name 查看当前线程的名称。threading.current_thread()返回以个当前线程的对象
print(gevent.getcurrent(),threading.current_thread().name,i)
#模拟io操作(io操作需要时间)
#通过gevnet.sleep交出协程控制权
gevent.sleep(1)
def main1():
#创佳协程
g1 = gevent.spawn(f,5)
g2 = gevent.spawn(f,5)
g3 = gevent.spawn(f,5)
g1.join()
g2.join()
g3.join()
def main2():
gevents = [gevent.spawn(f,5) for i in range(5)]
gevent.joinall(gevents)
main1()
main2()
二、协程与线程处理多任务的时间对比
from urllib import request
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
import gevent
from time_it import timeit
from gevent import monkey
monkey.patch_socket()
URLS = ['http://httpbin.org', 'http://example.com/'] * 20
def load_url(url):
headers = {'User-agent': 'Chrome/23.0'}
req = request.Request(url, headers=headers)
with urlopen(req) as url_obj:
content = url_obj.read().decode('utf-8')
# print('%s网页有%s字节'%(url,len(content)))
@timeit
def threadMain():
with ThreadPoolExecutor(max_workers=5) as pool:
pool.map(load_url,URLS)
@timeit
def geventMain():
jobs = [gevent.spawn(load_url,url) for url in URLS]
gevent.joinall(jobs)
def main():
threadMain()
geventMain()
if __name__ == '__main__':
main()
输出:
threadMain函数的运行时间为2.652229070663452 s
geventMain函数的运行时间为0.7784552574157715 s
三、通过greenlet学习协程
from greenlet import greenlet
def task1():
print(12)
gr2.switch()
print(34)
def task2():
print(56)
gr1.switch()
print(78)
if __name__ == '__main__':
gr1 = greenlet(task1)
gr2 = greenlet(task2)
gr1.switch()
代码分析:
这里使用greenlet创建了两个协程,greenlet.switch()方法用于切换协程,即运行指定的协程。主程序中的gr1.switch()运行后,会开始task1任务的协程,因此上面函数从task1开始运行,执行print(12),在执行gr2.switch()函数后切换到了gr2协程,则task2任务开始执行,print(56),gr1.switch()执行后在进入task1任务,此时会接着上一个运行片段,执行task1函数,则print(34)
task2结束运行,程序退出运行。
输出:
12 56 34
协程的父子关系
创建协程对象方法有两个参数:greenlet(run=None,parent=None),参数run是其协程的任务对象,即函数名参数parent是定义了该协程的父协程是哪一个协程,由此可见协程是具有父子关系的,如果不设或者为空,其父协程就是程序默认的'main'这个主协程,‘main’这个协程不需要用户创建,它所对应的方法就是主程序,而用户创建的所有的协程都是其子协程。
from greenlet import greenlet
def task1():
print(12)
gr2.switch()
print(34)
def task2():
print(56)
if __name__ == '__main__':
gr1 = greenlet(task1)
gr2 = greenlet(task2,gr1)
gr1.switch()
print(78)
代码分析:
这里创建了两个协程,在创建gr2协程创建时指定并指定了gr1是其父协程,因此在task2里面虽然没有gr1.switch(),但是因为gr1协程是gr2协程的父协程,当gr2结束运行时,代码会回到父协程中将父协程执行完毕后在进行退出。因此输出为12,56.34,78。
输出:
12 56 34 78
协程的异常
我们通过上面的学习就会发现我们的协程运行时相互切换时是存储在栈的数据结构中,那么协程需要抛出一个异常时,就会抛出到父协程中,如果 所有的父协程都不捕获此异常,程序才会退出。
代码1:
from greenlet import greenlet
def task1():
print(12)
gr2.switch()
print(34)
def task2():
print(56)
raise NameError
if __name__ == '__main__':
gr1 = greenlet(task1)
gr2 = greenlet(task2,gr1)
gr1.switch()
print(78)
代码2:
form greenlet import greenlet
def task1():
print(12)
try:
gr2.switch()
except NameError:
print(82)
print(34)
def task2():
print(56)
raise NameError
if __name__ == '__main__':
gr1 = greenlet(task1)
gr2 = greenlet(task2, gr1)
gr1.switch()
print(78)
代码分析:
我们运行完test1和test2之后,如果子协程'gr2'抛出的异常被其父协程‘gr1’捕获,那么程序会打印82,‘gr1’没有捕获异常异常会被继续抛出到主程序,并且结束运行的程序。
协程之间的消息传递
在greenlet对象在调用switch()方法时,传入如参数即可。
代码:
form greenlet import greenlet
def task1():
print(12)
y = gr2.switch(56)
print(y)
def task2(x):
print(x)
gr1.switch(34)
print(78)
if __name__ == '__main__':
gr1 = greenlet(task1)
gr2 = greenlet(task2)
gr1.switch()
代码分析:
在task1中调用gr2.switch(),由于协程‘gr2’未被启动,所以传入的参数56会被父在task2函数的参数x上,在task2()中调用 gr1.swtich() ,由于协程gr1之前已经被执行了一半到‘y=gr2.swtich(56)’此时gr2.swtich(34)会将34作为参数传递给task1并赋值给y,这样俩个协程之间就完成了消息的传递。
练习
我们使用多线程,多进程,协程,对某个帖子中的所有的邮箱践行筛选
import time_it
import re
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from urllib import request
from gevent import monkey
monkey.patch_socket()
import gevent
def get_url_content(url):
with request.urlopen(url) as url_it:
url_content = url_it.read()
return url_content.decode('utf-8')
def get_url_page(pattern,str_infor):
#<a href="/p/2314539885?pn=31">尾页</a>
return re.findall(pattern,str_infor)[0]
def get_email(url):
content = get_url_content(url)
print(re.findall(r'[a-zA-Z0-9]+@\w+\.com',content))
def get_email_as_compeletd(url):
content = get_url_content(url)
return re.findall(r'[a-zA-Z0-9]+@\w+\.com',content)
#使用map多线程
@time_it.timeit
def uesmapthread(url_list):
with ThreadPoolExecutor(max_workers=5) as pool:
pool.map(get_email, url_list)
#使用submit
@time_it.timeit
def usesumbitthread(url_list):
with ThreadPoolExecutor(max_workers=5) as pool:
for urls in url_list:
pool.submit(get_email, urls)
#使用as_completed 函数
@time_it.timeit
def use_as_completed(url_list):
with ThreadPoolExecutor(max_workers=5) as pool:
future_url = [pool.submit(get_email_as_compeletd, urls) for urls in url_list]
for future in as_completed(future_url):
print(future.result())
#不使用多线程
@time_it.timeit
def no_usethread(url_list):
for url in url_list:
get_email(url)
#使用协程爬取
@time_it.timeit
def use_gevent(url_list):
jobs = [gevent.spawn(get_email,url) for url in url_list]
gevent.joinall(jobs)
def main():
urlmain = 'http://tieba.baidu.com/p/2314539885'
pattern = r'<a href="/p/.*pn=(\d+)">尾页</a>'
page = int(get_url_page(pattern,get_url_content(urlmain)))
url_list = ['http://tieba.baidu.com/p/2314539885'+'?pn=%s'%(i) for i in range(page)]
# uesmapthread(url_list)
#usesumbitthread(url_list)
# use_as_completed(url_list)
#no_usethread(url_list)
use_gevent(url_list)
if __name__ == '__main__':
main()