tornado 实现 将阻塞函数 改变为非阻塞 助力并发开发

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yangxiaodong88/article/details/81777629

背景

python 并发最强大的地方 就是使用协程。 tornado 基于事件循环, 轮询的方式来实现并发, 在阅读本篇文章的时候 默认为你有了一定的 基础知识, 使用过yield yield from async await 并充分理解了。 django 每创建一个连接就会创建一个线程, 这样对服务器的要求比较高, 服务器的压力比较大。 tornado 充分利用协程实现并发, 使用各种异步库来实现并发。 但是还有很多没有优秀的异步库, 或者说是不稳定。 例如mysql 目前没有敢用的 异步操作mysql的库。 现在最好的办法就是 将这样的阻塞函数 通过一定的方法给转化为非阻塞的。

一、在for 循环中使用

比如下载的时候网络IO流 是阻塞的 一种方法是变为非阻塞的, 一种方法是使用异步访问库。

直接将方法变为非阻塞的函数

from concurrent import futures
from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))
if __name__ == '__main__':
    main(download_many) 

以上是伪代码,内部使用的是多线程。使用map过后 能直接得到结果, 是一个生成器。 要获取到具体的值要调用迭代器, 进行循环获取。 map的优点是 能够保证顺序, 按照进去的顺序出来 res 的值。 确定是, 等到所有的任务排定完再执行 任务。 这样不能达到最快, 拆分的话 不能保证顺序但是 是当任务出来之后就会执行。 用比较官网的说法。

map 方法的作用与内置的map函数类型类似, 不过download_one 函数会在多个线程中并发调用; map 方法返回一个生成器, 因此可以迭代,, 获取各个函数返回的值。
注意: 编写并发代码时候经常这样重构: 把依次for 循环体改成函数, 以便这样并发调用。

为了了解future (期物) 现在来慢慢介绍下
我们目前用的库是concurrent.futures

标准库中有两个名为Future 的类: concurrent.futures.Future 和asyncio.Future. 两个类的作用相同:两个类都表示可能已经完成或者尚未完成的延迟计算。 这和Twisted引擎中的Deferred类, Tornado 框架中的Future类以及多个javascript 库中的Promise 对象类似。

future (期物) 封装待完成的操作, 可以放入队列, 完成的状态可以查询, 得到结果(或者抛出异常)后可以获得结果或者异常。

我们要记住一点:一般自己不常见期物, 而只能有并发框架(concurrent.futures 或 asyncio) 实例化。 原因很简单,
期物表示 终将发生的事情, 而确定某件事情会发生的唯一方式是执行的时间已经排定。 因此, 只有排定把某件事交给concurrent.futures.Executor 子类处理时, 才会创建concurrent.futures.Future 实例。 例如
,Executor.submit() 方法的参数是一个可以调用的对象,调用这个方法之后会为可调用的参数排期, 并返回一个期物。

客户端代码不应该改变期物的状态, 并发框架在期物表示的延迟计算结束后会改变期物的状态, 而我们无法控制计算何时结束。

这两种期物都有 .done() 方法, 这个方法不阻塞, 返回值是布尔值, 指明期物连接的可调用对象是否已经执行。 客户端代码不会询问期物是否结束, 而是会等待通知。
因此 两个Future类都有 .add_done_callback()方法, 这个方法只有一个参数, 类型是可调用的对象, 期物运行结束后会调用指定的可调用对象。

此外还有 .result() 方法。 在期物运行结束之后调用的话,这个方法在两个Future 类中的作用是相同的, :返回可调用对象的结果,或者重新抛出执行可调用的对象时候抛出的异常。可是如果期物没有结束, result在两个Future类中的行为相差很大。对于concurrent.futures.Future 实例来说, 调用f.result() 方法会阻塞调用方所在的线程直到有结果可以返回。此时 result方法可以接收可选的timeout参数, 如果在指定的时间内期物没有运行完毕, 会抛出TimeoutError异常。asyncio.Future.result 方法不支持设定超时时间, 在那个库中获取期物的结果最好使用的是yield from 结构。但是切记 对concurrent.futures.Future 实例不能这么做。

这两个函数中有几个函数会返回期物, 其他函数则使用期物。上面例子中Executor.map 方法是属于使用期物的:返回值是一个迭代器,迭代器的next 方法调用各个期物的result方法, 因此我们可以得到的是各个期物的结果, 而非期物本身。

为了从实用的角度理解期物, 我们可以使用concurrent.futures.as_completed函数, 这个函数的参数是一个期物列表, 返回值的是一个迭代器, 在期物运行结束之后产出期物。

为了使用futures.as_completed函数, 把要抽象的executor.map调用换成两个for循环:一个用于创建并排定期物, 另一个用于获取期物的结果。

# Author : July  Yang 
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
         results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    return len(results)

将Executor.map 换成executor.submit方法和 futures.as_completed函数
executor.submit 方法排定可调用对象的执行时间, 然后返回一个期物,表示这个待执行的操作。

as_completed 在期物运行结束后产出期物。 在这个例子中future.result() 方法绝对不会阻塞, 因为future是有as_completed函数产出。

阻塞IO和GIL

要实现并发编程实现并发编程 在python 必须要理解阻塞IO 和GIL

cpython 本身就不是线程安全的, 因此有全局解释器锁GIL , 一次只允许使用一个线程执行 python 字节码。因此python进程通常不能同时使用多个cpu 核心。

线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据污染。
线程不安全就是不提供数据访问保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据

标准库中 所有阻塞型IO函数都, 在等待操作系统返回结果时候都会释放GIL。 这意味着在python的语言层次上面可以使用多线程,而IO 密集型python 可以从中受益: 一个python线程等待网络响应时候, 阻塞型IO 函数会释放GIL, 再运行一个线程。

python 标准库中所有的阻塞型IO 函数都会释放GIL, 允许其他线程运行。time.sleep() 函数会释放GIL, 因此尽管有GIL, python线程还能够在IO 密集型应用中发挥作用。

实验Executor.map()方法

要想并发运行多个可调用的对象, 最简单的方式是实现Executor.map()方法

from time import sleep, strftime
from concurrent import futures
def display(*args): 
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
def loiter(n): 
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10 
def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3) 
    results = executor.map(loiter, range(5)) 
    display('results:', results) 
    display('Waiting for individual results:')
    for i, result in enumerate(results): 
    display('result {}: {}'.format(i, result))
main()

重点
executor.map函数易于使用, 不过有个特性可以使用,也可能没有用,具体情况取决于需求:这个返回结果的顺序与调用开始的顺序一致。如果第一个调用生成结果用时10秒, 而其他的调用需要1秒, 代码会阻塞10秒, 获取map 方法返回的生成器产生的第一个结果。在此之后获取后续结果时候不会阻塞, 因为后续的调用已经结束。如果必须要等到所有的结果后再处理, 使用map没有错, 这就是获取所有的结果之后再进行处理的。通常情况是不管提交的顺序, 只要有结果就获取,这样就需要把Execute.submit方法和futures.as_completed 函数结合起来使用。这两个结合起来比较灵活, submit 方法能够处理不同的可调用对象和参数,而executor.map 只能处理参数不同的同一个可调用对象。此外传给futures.as_completed函数的期物集合可以来之多个executor实例。例如一些有ThreadPoolExecutor实例创建, 另一些有ProcessPoolExecutor实例创建。

二、在tornado 中使用

事件循环机制

使用注解的方式实现 将阻塞型函数变为非阻塞



from tornado.concurrent import run_on_executor
import time
import tornado.web
import tornado.gen

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures

class LogicAsyTest():
    executor = ThreadPoolExecutor(20)
    def __init__(self):
       pass
    @tornado.gen.coroutine
    def bb(self):
        res = yield self.my_sleep()
        return res

    async def execute(self, base_url):
        """ 这个方法是非常好的的"""
        result = Result.result_success()

        res = await self.bb()
        result['content'] = res
        # time.sleep(100)
        OHHOLog.print_log("test result")
        OHHOLog.print_log(result)
        return result

    async def async_execute(self, base_url):
        result = await self.execute(base_url)
        return result

    @run_on_executor
    def my_sleep(self):        
        time.sleep(5)
        return 5

@run_on_executor
注解就是将阻塞函数变为非阻塞。

利用tornado 中事件循环机制 进行并发操作。 测试使用siege 工具进行测试。

猜你喜欢

转载自blog.csdn.net/yangxiaodong88/article/details/81777629
今日推荐