【python内功修炼011】:深入淺出了解python协程

一、引子

一般来说,线程下的程序执行路径都是层级调用, 比如A(子程序/函数)调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

如下实例:


def A():
    print('A1')
    print('A2')
    print('A3')

def B():
    print('B1')
    print('B2')
    print('B3')
    
    
if __name__ == '__main__':
    A()
    B()

结果:

A1
A2
A3
B1
B2
B3

结论:

子程序被调用都是顺序执行的,一个线程按照顺序调用一个或多个子程序,总是一个入口,一次返回。

协程和线程区别(概念层)

协程看上去也是线程,但在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

如下实例:

注意:在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def eat():
    print('吃第一口饭')
    print('吃第二口饭')
    print('吃第三口饭')

def play():
    print('玩一下手机')
    print('玩二下手机')
    print('玩三下手机')
    
    
if __name__ == '__main__':
    A()
    B()

假设由协程执行,我们来模拟吃一口饭玩一下手机的场景,结果可能是:

吃第一口饭
玩一下手机
吃第二口饭
玩二下手机
吃第三口饭
玩三下手机

提示:

但是在eat中是没有调用play的,所以协程的调用比函数调用理解起来要难一些。

从某种意义上 看起来eat和play的执行有点像多线程,但协程的特点在于是一个线程执行

二、协程介绍

2.1 协程诞生的背景

对于单线程下,我们不可避免程序中出现IO操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到IO阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被CPU执行的状态,相当于我们在用户程序级别将自己的IO操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:

该线程好像是一直在计算,IO比较少,从而更多的将CPU的执行权限分配给我们的线程。

协程的本质就是在单线程下,由用户自己控制一个任务遇到IO阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

#1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
#2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

2.2 官方解释

协程,又称微线程,纤程。英文名Coroutine ,是一种用户态的轻量级线程。

协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。

2.3 自己的理解

协程本质上就是一个线程,以前线程任务的切换是由操作系统控制的,遇到I/O自动切换,现在我们用协程的目的就是较少操作系统切换的开销(开关线程,创建寄存器、堆栈等,在他们之间进行切换等),在我们自己的程序里面来控制任务的切换。

2.4 协程的优缺点

优点:

执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。

不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。

缺点:

多个任务一旦有一个阻塞没有切换,整个线程都阻塞在原地
该线程内的其他的任务都不能执行了

一旦引入协程,就需要检测单线程下所有的IO行为,
实现遇到IO就切换,少一个都不行,以为一旦一个任务阻塞了,整个线程就阻塞了,
其他的任务即便是可以计算,但是也无法运行了

提示:

  1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到IO或执行时间过长就会被迫交出CPU执行权限,切换其他线程运行)
  2. 单线程内开启协程,一旦遇到IO,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非IO操作的切换与效率无关)

2.5 基于yield来进行协程的模擬


      本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,
为此我们需要先回顾下并发的本质:切换+保存状态
     cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是
     该任务发生了阻塞,``另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它

在这里插入图片描述

PS:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态

yield本身就是一种在单线程下可以保存任务运行状态的方法

1、 yield可以保存当前函数状态,形成一个阻塞,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级

2、 send 可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换 。

其中第二种情况并不能提升效率,只是为了让CPU能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

串行执行的代码

# 串行执行
class Test_serial:

    def f1(self):
        ' 生成数据,得到f1和 '
        return sum([i for i in range(10000000)])

    def f2(self):
        '生成数据得到f2和'
        return sum([i for i in range(10000000)])

    def main(self):
        self.sum = self.f1() + self.f2()
        print(self.sum)

if __name__ == '__main__':
    start = time.time()
    g = Test_serial()
    g.main()
    stop = time.time()
    print('总计用时:', stop - start)

    '''
   
    结果:
    99999990000000
	总计用时: 2.5535383224487305

    '''

通过yield实现任务切换+保存线程

#  将串行执行代码,通过yield 改为并发执行
# ===============================================================
class Test_yield:


    def f1(self):
        '''任务1:接收数据,做加法 '''
        self.sum = 0
        while True:
            x = yield # 通过yield 拿到f2传的值
            self.sum = self.sum + 2*x
            

    def f2(self):
        '''任务2:生产数据 '''
        g = self.f1()  # 得到f1生成器对象
        next(g)  #找到了f1函数的yield位置
        for i in range(10000000):
            g.send(i)  # #给yield传值,然后再循环给下一个yield传值,并且多了切换的程序,比直接串行执行还多了一些步骤,导致执行效率反而更低了。

    def main(self):
        self.f1()
        self.f2()
        print(self.sum)

if __name__ == '__main__':
    start1 = time.time()
    g = Test_yield() 
    g.main()
    stop1 = time.time()
    print('总计用时:', stop1 - start1)
    
    
    '''
   
    结果:
    99999990000000
	总计用时: 3.0752406120300293

    '''
    
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.

三、greenlet模块

如果单个线程内有多个任务需要切换,使用yield比较麻烦(需要先初始化,然后调用send),在python中提供的greenlet模块能够简单快速的实现多个任务的直接切换。

3.1 常用方法


# Greenlet对象
from gevent import Greenlet
 
# Greenlet对象创建
job = Greenlet(target0, 3)
Greenlet.spawn() # 创建一个协程并启动
Greenlet.spawn_later(seconds=3) # 延时启动
 
# 协程启动
job.start() # 将协程加入循环并启动协程
job.start_later(3) # 延时启动
 
# 等待任务完成
job.join() # 等待任务完成
job.get() # 获取协程返回的值
 
# 任务中断和判断任务状态
job.dead() # 判断协程是否死亡
job.kill() # 杀死正在运行的协程并唤醒其他的协程,这个协程将不会再执行,可以
job.ready() # 任务完成返回一个真值
job.successful() # 任务成功完成返回真值,否则抛出错误
 
# 获取属性
job.loop # 时间循环对象
job.value # 获取返回的值
 
# 捕捉异常
job.exception # 如果运行有错误,获取它
job.exc_info # 错误的详细信息
 
# 设置回调函数
job.rawlink(back) # 普通回调,将job对象作为回调函数的参数
job.unlink() # 删除回调函数
# 执行成功的回调函数
job.link_value(back)
# 执行失败的回调函数
job.link_exception(back)

3.2 代码示例

'''

利用greenlet实现吃一口饭玩一下手机功能
'''

from greenlet import greenlet

def eat(name):
    print('%s:吃第一口饭' % name)
    g2.switch('金鞍少年')  # 切换到play代码块内 第一行
    print('%s:吃第二口饭' % name)
    g2.switch()  # 切换到play代码块内 第三行


def play(name):
    print('%s: 玩一下手机' % name)
    g1.switch()  # 切换到eat代码块内第三行
    print('%s: 玩二下手机' % name)


if __name__ == '__main__':
    g1 = greenlet(eat)
    g2 = greenlet(play)

    g1.switch('金鞍少年')  # 在第一次switch时传入参数,以后都不需要

打印结果:

金鞍少年:吃第一口饭
金鞍少年: 玩一下手机
金鞍少年:吃第二口饭
金鞍少年: 玩二下手机

模拟greenlet遇到IO:

'''

利用greenlet实现吃一口饭玩一下手机功能
'''

from greenlet import greenlet
import time


def eat(name):
    
    print('%s:吃第一口饭' % name)
    time.sleep(10000)  # 模拟遇到 IO
    g2.switch('金鞍少年')  # 切换到play代码块内 第一行
    print('%s:吃第二口饭' % name)
    g2.switch()  # 切换到play代码块内 第三行


def play(name):
    print('%s: 玩一下手机' % name)
    g1.switch()  # 切换到eat代码块内第三行
    print('%s: 玩二下手机' % name)


if __name__ == '__main__':
    g1 = greenlet(eat)
    g2 = greenlet(play)

    g1.switch('金鞍少年')  # 在第一次switch时传入参数,以后都不需要
    

结果

greenlet只是提供了一种比yield更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

四、Gevent模块

4.1 Gevent介绍

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

4.2 Gevent用法

语法 作用
gevent.spawn(cls, *args, **kwargs) 创建一个普通的Greenlet对象并切换
gevent.spawn_later(seconds=3) 延时创建一个普通的Greenlet对象并切换
gevent.spawn_raw() 创建的协程对象属于一个组
gevent.getcurrent() 返回当前正在执行的greenlet
gevent.joinall(jobs) 将协程任务添加到事件循环,接收一个任务列表
gevent.wait() 可以替代join函数等待循环结束,也可以传入协程对象列表
gevent.kill() 杀死一个协程
gevent.killall() 杀死一个协程列表里的所有协程
monkey.patch_all() 非常重要,会自动将python的一些标准模块替换成gevent**框架*

4.3 简单示例

import gevent 
import time

def eat(name):
    print('%s:吃第一口饭' % name)
    gevent.sleep(2)  # 模拟阻塞
    print('%s:吃第二口饭' % name)

def play(name):
    print('%s: 玩一下手机' % name)
    gevent.sleep(2)  # 模拟阻塞
    print('%s: 玩二下手机' % name)


if __name__ == '__main__':
    g1 = gevent.spawn(eat, '金鞍少年')
    g2 = gevent.spawn(play, '金鞍少年')
    gevent.joinall([g1, g2])  # 等同于g1.join(),g2.join()
    
   '''
   	金鞍少年:吃第一口饭
	金鞍少年: 玩一下手机
	金鞍少年:吃第二口饭
	金鞍少年: 玩二下手机
   ''' 

gevent模块不能识别它本身以外的所有的IO行为,但是它内部封装了一个模块,能够帮助我们识别所有的IO行为

4.4 猴子补丁 Monkey的用法

这个补丁是Gevent模块最需要注意的问题,有了它,才会让Gevent模块发挥它的作用。我们往往使用Gevent是为了实现网络通信的高并发,但是,Gevent直接修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题。

一种方法是使用gevent下的socket模块,我们可以通过”from gevent import socket”来导入。不过更常用的方法是使用猴子布丁(Monkey patching)。使用猴子补丁褒贬不一,但是官网上还是建议使用”patch_all()”,而且在程序的第一行就执行。

from gevent import monkey; monkey.patch_socket()

实例

#    使用猴子补丁
from gevent import monkey; monkey.patch_socket()
import gevent
import time

def eat(name):
    print('%s:吃第一口饭' % name)
    time.sleep(2)  # 模拟阻塞
    print('%s:吃第二口饭' % name)



def play(name):
    print('%s: 玩一下手机' % name)
    time.sleep(2)  # 模拟阻塞
    print('%s: 玩二下手机' % name)


if __name__ == '__main__':
    g1 = gevent.spawn(eat, '金鞍少年')
    g2 = gevent.spawn(play, '金鞍少年')
    gevent.joinall([g1, g2])  # 等同于g1.join(),g2.join()


'''
 可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程 
'''

模拟网络请求IO

# 模拟网络请求IO
from gevent import monkey; monkey.patch_socket()
import gevent
import socket

urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)

print([job.value for job in jobs])

4.5 Gevent之同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()

    

总结:

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

五、 用gevent实现单线程下的socket并发

服务端:

from gevent import monkey;monkey.patch_all()
import socket
import gevent
class server:

    def __init__(self, address, backlog=5):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind(address)
        self.server.listen(backlog)

    def connects(self):
        print('服务端已就绪!')
        while True:  # 链接循环
            conn, client_addr = self.server.accept()
            gevent.spawn(self.talk, conn, client_addr)

    # 通信循环
    def talk(self, conn,client_addr):
        try:
            while True:
                res = conn.recv(1024)
                print('client %s:%s msg: %s' %(client_addr[0],client_addr[1],res))
                conn.send(res.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()


if __name__ == '__main__':
    obj = server(('127.0.0.1', 8833), 5)
    obj.connects()

服务端:

import socket
from threading import Thread,current_thread

def clients(server_ip,port):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((server_ip, port))

    count = 0
    while True:
        client.send(('%s say hello %s' % (current_thread().getName(), count)).encode('utf-8'))
        msg = client.recv(1024)
        print(msg.decode('utf-8'))
        count += 1


if __name__ == '__main__':
    for i in range(100):  # 开启100个线程,模拟并发效果
        T = Thread(target=clients, args=('127.0.0.1', 8833,))
        T.start()
发布了81 篇原创文章 · 获赞 105 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/weixin_42444693/article/details/105462462
今日推荐