python-异步执行库asyncio


title: python-异步执行库asyncio
categories: Python
tags: [python, asyncio, 异步, 并行]
date: 2020-09-28 14:45:34
comments: false
mathjax: true
toc: true

写工具时遇到需要请求几十次数据, 同步顺序执行, 速度有点慢, 改成异步并行执行就爽很多了, 同理, 其他设计到 io 的会阻塞的都可以使用异步并行执行的方式去解决. 一样的, 文件 io 也可以异步处理.

使用到的是 asyncio (内置) + aiohttp/aiofiles (需要 pip 安装)


前篇

  • 官方
    • 协程与任务 - https://docs.python.org/zh-cn/3/library/asyncio-task.html
  • 使用asyncio+aiohttp爬取数据并拼装返回的数据 - https://blog.csdn.net/cui_yonghua/article/details/106840662
  • Python黑魔法 — 异步IO( asyncio) 协程 - https://www.jianshu.com/p/b5e347b3a17c

代码

  • 工具类 async_util.py (简单的封装一下)

    # -*- coding: utf-8 -*-
    
    import aiofiles
    import aiohttp
    import asyncio
    import json
    import sys
    import traceback
    import threading
    from typing import List
    
    from tool import utils
    
    
    class CReqInfo:
        def __init__(self):
            self.url = None
            self.method = "POST"
            self.data = None
            self.extA = None  # 透传数据
    
    
    class CRspInfo:
        def __init__(self):
            self.code: int = 0
            self.text = None
            self.extA = None  # 透传数据
    
    
    class CFileInfo:
        def __init__(self, path, encoding="utf-8"):  # 默认读出字符串
            self.path = path
            self.encoding = encoding
            self.content = None
            self.error = None
            self.extA = None  # 透传数据
    
    
    class CCmdInfo:
        def __init__(self, cmd):
            self.code = 0
            self.msg = None
            self.cmd = cmd
            self.extA = None  # 透传数据
    
    
    class CThreadInfo:
        def __init__(self, target, args=()):
            self.target = target
            self.args = args
            self.result = None
    
    
    class CInnerThread(threading.Thread):
        # def __init__(self, autoid, target, args=()):
        def __init__(self, autoid, ti: CThreadInfo):
            super(CInnerThread, self).__init__()
            self.autoid = autoid
            self.target = ti.target
            self.args = ti.args
            self.ti: CThreadInfo = ti
    
        def run(self):
            try:
                self.ti.result = self.target(*self.args)
            except Exception as e:
                self.ti.result = e
                traceback.print_stack()
    
        def get_result(self):
            return self.autoid, self.ti
    
    
    class CAsyncHttp:
        """
        异步 http 工具
        """
    
        async def request(self, reqInfo: CReqInfo):
            if isinstance(reqInfo.data, dict):
                reqInfo.data = json.dumps(reqInfo.data)
    
            rspInfo = CRspInfo()
            try:
                async with aiohttp.request(method=reqInfo.method, url=reqInfo.url, data=reqInfo.data) as response:
                    rspInfo.code = int(response.status)
                    rspInfo.extA = reqInfo.extA
                    rspInfo.text = await response.text()
            except Exception as e:
                rspInfo.code = -999
                rspInfo.text = e
            finally:
                return rspInfo
    
        def doReq(self, *reqArr) -> List[CRspInfo]:
            return CAsyncTask().doTask(*[self.request(reqInfo) for reqInfo in reqArr])
    
    
    class CAsyncFileRead:
        """
        异步 文件 读
        """
    
        async def read(self, fi: CFileInfo):
            try:
                async with aiofiles.open(fi.path, mode="rb") as fd:
                    content = await fd.read()
                    fi.content = fi.encoding is None and content or str(content, encoding=fi.encoding, errors="ignore")
            except Exception as e:
                fi.error = e
            finally:
                return fi
    
        def doRead(self, *fileArr) -> List[CFileInfo]:
            return CAsyncTask().doTask(*[self.read(fi) for fi in fileArr])
    
    
    class CAsyncFileWrite:
        """
        异步 文件 写
        """
    
        async def write(self, fi: CFileInfo):
            utils.createDirForFile(fi.path)
            try:
                async with aiofiles.open(fi.path, mode="wb") as fd:
                    bts = fi.encoding is None and fi.content or fi.content.encode(
                        encoding=fi.encoding)
                    await fd.write(bts)
            except Exception as e:
                fi.error = e
            finally:
                return fi
    
        def doWrite(self, *fileArr) -> List[CFileInfo]:
            return CAsyncTask().doTask(*[self.write(fi) for fi in fileArr])
    
    
    class CAsyncCmd:
        """
        异步 系统命令
        """
    
        async def run(self, ci: CCmdInfo):
            proc = await asyncio.create_subprocess_shell(
                ci.cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE)
    
            stdout, stderr = await proc.communicate()
            bts = stdout or stderr
    
            ci.code = proc.returncode
            ci.msg = bts is not None and str(bts, encoding="utf-8", errors="ignore")
            return ci
    
        def doCmd(self, *cmdArr) -> List[CCmdInfo]:
            return CAsyncTask().doTask(*[self.run(ci) for ci in cmdArr])
    
    
    class CAsyncTask:
        """
        异步 任务
        """
    
        def __init__(self):
            self.isStopProgress = False
    
        async def progress(self):
            symbol = ["/", "ᅳ", "\\", "|"]
            total = len(symbol)
            cnt = 0
            while not self.isStopProgress:
                sys.stdout.write(f"------ processing {symbol[cnt % total]}\r")
                sys.stdout.flush()
                await asyncio.sleep(0.1)
                cnt += 1
            print("------ processing 100%")
    
        async def start(self, *taskArr):
            first = asyncio.gather(*taskArr)
            second = asyncio.create_task(self.progress())
    
            retVal = await first
            self.isStopProgress = True
            await second
    
            return retVal
    
        def doTask(self, *taskArr):
            loop = asyncio.get_event_loop()
            res = loop.run_until_complete(self.start(*taskArr))
            # loop.close() # https 会报错: RuntimeError: Event loop is closed
            return res
    
    
    class CAsyncThread:
        """
        多线程 封装
        """
    
        def doRun(self, *threadArr) -> List[CThreadInfo]:
            thdInsArr = []
            autoid = 1
            for ti in threadArr:
                thd = CInnerThread(autoid=autoid, ti=ti)
                autoid += 1
                thdInsArr.append(thd)
                thd.start()
    
            retDct = {
          
          }
            for thd in thdInsArr:
                thd.join()
                aid, ti = thd.get_result()
                retDct[aid] = ti
    
            sorted(retDct.items(), key=lambda x: x[0], reverse=False)
            return list(retDct.values())
    
        # ------------- 对外接口 -------------
    
    
    def doTask(*taskArr):
        return CAsyncTask().doTask(*taskArr)
    
    
    def doReq(*reqArr):
        return CAsyncHttp().doReq(*reqArr)
    
    
    def doRead(*fileArr):
        return CAsyncFileRead().doRead(*fileArr)
    
    
    def doWrite(*fileArr):
        return CAsyncFileWrite().doWrite(*fileArr)
    
    
    def doCmd(*cmdArr):
        return CAsyncCmd().doCmd(*cmdArr)
    
    
    def doRun(*threadArr):
        return CAsyncThread().doRun(*threadArr)
    
  • 测试用例

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    import sys
    import os
    import asyncio, aiohttp, aiofiles
    import json
    from datetime import datetime, timedelta
    
    from time import ctime, sleep
    import time
    import unittest
    
    from tool import utils, async_util
    
    SelfPath: str = os.path.abspath(os.path.dirname(__file__))
    
    
    # 相关参考:
    # https://blog.csdn.net/cui_yonghua/article/details/106840662
    # https://www.jianshu.com/p/b5e347b3a17c
    
    class Test_Async(unittest.TestCase):
        def setUp(self):
            print("\n\n------------------ test result ------------------")
    
        def test_gather(self):
            async def count(num):
                print(f"One - {num}")
                await asyncio.sleep(1)
                print(f"Two - {num}")
    
            async def main():
                await asyncio.gather(count(1), count(2), count(3))  # gather 并发执行, 返回的结果是顺序的
    
            asyncio.run(main())
            print("--- finished")
    
        def test_createTask(self):
            async def count(num):
                print("One")
                await asyncio.sleep(num)
                print("Two")
    
            async def main():
                first = asyncio.create_task(count(2))  # 创建的时候就开始执行
                second = asyncio.create_task(count(1))
    
                await first
                print(f"finished first")
                await second
                print(f"finished second")
    
            asyncio.run(main())
            print("--- finished")
    
        def test_progress(self):
            from tool.async_util import CAsyncTask, CRspInfo
    
            # 要执行的任务
            async def reqFn(num):
                url = "http://149.129.147.44:8305/hotupdate"
                reqInfo = {
          
          
                    "Plat": 8,
                    "Os": 2,
                    "Appid": 3,
                    "Uid": '123123',
                    "Version": '0.0.0.1',
                    "Deviceid": 'wolegequ',
                }
                rspInfo = CRspInfo()
                try:
                    async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                        print(f"--- idx: {num} code: {rsp.status}")
                        rspInfo.code = num
                        rspInfo.text = await rsp.text()
                except:
                    rspInfo.code = -999
                finally:
                    return rspInfo
    
            async def reqFn01():
                print("--- start reqFn01")
                await asyncio.sleep(20)
                return "hello01"
    
            async def reqFn02():
                print("--- start reqFn02")
                await asyncio.sleep(10)
                return "hello02"
    
            async def reqFn03():
                print("--- start reqFn03")
                await asyncio.sleep(30)
                return "hello03"
    
            taskArr = [reqFn(idx) for idx in range(30)]
    
            res = CAsyncTask().doTask(reqFn01(), reqFn02(), reqFn03(), *taskArr)
            print(f"--- finished, res: {utils.beautyJson(res)}")
    
        # 异步 io http
        def test_concurrencyReq(self):
            url = "http://149.129.147.44:8305/hotupdate"  # 测试服
            # url = "https://www.baidu.com"  # 测试服
    
            reqInfo = {
          
          
                "Plat": 8,
                "Os": 2,
                "Appid": 3,
                "Uid": '123123',
                "Version": '0.0.0.1',
                "Deviceid": 'wolegequ',
            }
    
            # code, rspDct = utils.httpPost(url, utils.objToJson(reqInfo))
            # print(f"--- code: {code}, rsp: {utils.beautyJson(rspDct)}")
            # return
    
            async def reqFn(idx):
                try:
                    # async with aiohttp.request(method="GET", url=url) as rsp:
                    async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                        print(f"--- idx: {idx} code: {rsp.status}")
                        # response.request_info # 请求信息
                        res = await rsp.text()
                        # print(f"--- res: {res}")
                        return res
                except:
                    return "--- error"
    
            # create task 方式
            async def main01():
                taskArr = []
                for idx in range(5):
                    task = asyncio.create_task(reqFn(idx))  # 创建的时候就开始执行
                    taskArr.append(task)
    
                resArr = []
                for task in taskArr:  # 等待所有请求完成
                    res = await task
                    resArr.append(res)
                return resArr
    
            # gather 方式
            async def main02():
                taskArr = []
                for idx in range(5):
                    task = reqFn(idx)
                    taskArr.append(task)
                return await asyncio.gather(*taskArr)
    
            # 这个正常执行
            loop = asyncio.get_event_loop()
            resArr = loop.run_until_complete(main02())  # 完成事件循环,直到最后一个任务结束
    
            # # 这样执行可能会有报错: RuntimeError: Event loop is closed
            # resArr = asyncio.run(main02())
    
            print("--- finished")
            print(f"--- resArr: {utils.beautyJson(resArr)}")
    
        def test_compare_http(self):
            url = "http://149.129.147.44:8305/hotupdate"
            # url = "https://www.baidu.com"
            reqCnt = 1
    
            dct = {
          
          
                "Plat": 8,
                "Os": 2,
                "Appid": 3,
                "Uid": '123123',
                "Version": '0.0.0.1',
                "Deviceid": 'wolegequ',
            }
    
            @utils.call_cost
            def syncFn():
                print("--- syncFn start")
                for idx in range(reqCnt):
                    code, rspDct = utils.httpPost(url, dct)
                print("--- syncFn end")
    
            @utils.call_cost
            def asyncFn():
                print("--- asyncFn start")
    
                reqArr = []
                for idx in range(reqCnt):
                    ri = async_util.CReqInfo()
                    ri.url = url
                    ri.data = dct  # 可以是 dict, 也可以是 json string
                    ri.method = "POST"
                    ri.extA = f"extra data {idx}"
                    reqArr.append(ri)
    
                resArr = async_util.doReq(*reqArr)
                print("--- type: {}, len: {}".format(type(resArr), len(resArr)))
                # print(f"--- finished, resArr: {utils.beautyJson(resArr)}")
                print("--- asyncFn end")
    
            sync_cc = syncFn()
            print("sync: {}".format(sync_cc))
    
            print()
            async_cc = asyncFn()
            print("async: {}".format(async_cc))
    
        # 异步 io 文件
        def test_asyncFile(self):
            async def dealFile(filePath):
                print("--- dealFile:", filePath)
                async with aiofiles.open(filePath, mode="r") as fd:  # 读
                    txt = await fd.read()
                    print("--- read:", txt)
    
                async with aiofiles.open(filePath, mode="w") as fd:  # 写
                    await fd.write("wolegequ")
    
                return "done!!"
    
            path = utils.getDesktop("test_io2/aaa.txt")
            res = async_util.doTask(dealFile(path))
            print("--- res:", res)
    
        # 异步 io 文件, 按行读取
        def test_asyncLine(self):
            async def dealFile(filePath):
                print("--- dealFile:", filePath)
                async with aiofiles.open(filePath, mode="rb") as fd:  # 读
                    async for line in fd:
                        # print("--- line:", line.decode(encoding="utf-8", errors="ignore"))
                        print("--- line:", str(line, encoding="utf-8", errors="ignore"))
    
            path = utils.getDesktop("a_temp.lua")
            res = async_util.doTask(dealFile(path))
            print("--- res:", res)
    
        # 对比读文件 同步 异步 耗时
        def test_compare_readFile(self):
            dstDir = utils.getDesktop("test_io")
            fileArr = utils.getFiles(dstDir, ["*.*"])
            print("--- fileArr len: {}".format(len(fileArr)))
    
            @utils.call_cost
            def syncFn():
                print("--- syncFn start")
                for file in fileArr:
                    # time.sleep(0.5)
                    utils.readFileBytes(file)
                print("--- syncFn end")
    
            @utils.call_cost
            def asyncFn():
                print("--- asyncFn start")
    
                fiArr = [async_util.CFileInfo(file) for file in fileArr]
    
                res = async_util.doRead(*fiArr)
                # print("--- res:", utils.beautyJson(res))
    
                # 换个目录写进去
                # for fi in fiArr:
                #     fi.path = fi.path.replace("test_io2", "test_io3")
                # async_util.doWrite(*fiArr)
    
                print("--- asyncFn end")
    
            sync_cc = syncFn()
            print("sync: {}".format(sync_cc))
    
            print()
            async_cc = asyncFn()
            print("async: {}".format(async_cc))
    
        # 异步并行执行 系统命令
        def test_subprocess(self):
            # 官方文档: https://docs.python.org/3/library/asyncio-subprocess.html
    
            async def run(cmd):
                proc = await asyncio.create_subprocess_shell(
                    cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE)
    
                stdout, stderr = await proc.communicate()
    
                print(f'[{cmd!r} exited with {proc.returncode}]')
                if stdout:
                    print(f'[stdout]\n{stdout.decode(errors="ignore")}')
                if stderr:
                    print(f'[stderr]\n{stderr.decode(errors="ignore")}')
    
            cmd = "git status"
            asyncio.run(run(cmd))
    
        def test_compare_subprocess(self):
            cnt = 5
    
            # cmd = "git status"
            cmd = "call {}".format(utils.getDesktop("aaa.exe"))  # aaa.exe 会执行 9s
    
            @utils.call_cost
            def asyncFn():
                cmdArr = []
                for i in range(cnt):
                    ci = async_util.CCmdInfo(cmd)
                    ci.extA = i
                    cmdArr.append(ci)
    
                res = async_util.doCmd(*cmdArr)
                # print("--- res:", utils.beautyJson(res))
    
            @utils.call_cost
            def syncFn():
                async def run(command):
                    return utils.cmdToString(command)
    
                res = async_util.doTask(*[run(cmd) for i in range(cnt)])
                # print("--- res:", utils.beautyJson(res))
    
            dt1 = syncFn()
            print("--- syncFn cost time:", dt1)  # --- syncFn cost time: 00:00:45, 顺序执行
    
            dt2 = asyncFn()
            print("--- asyncFn cost time:", dt2)  # --- asyncFn cost time: 00:00:09, 并行执行
    
        # 真正的 多线程并行
        def test_multi_thread(self):
            def fn001(name):
                # print("--- hello 111, name: {}".format(name))
                # time.sleep(5)
                # print("--- hello 222, name: {}".format(name))
    
                # 模拟异常
                # assert False, "--- wolegequ"
                # arr = []
                # b = arr[1]
    
                utils.execute("call {}".format(utils.getDesktop("aaa.exe")))
                return "world-{}".format(name)
    
            # res 顺序返回值
            res = async_util.doRun(*[async_util.CThreadInfo(target=fn001, args=(i,)) for i in range(3)])
            # print("--- end, res: {}".format(utils.beautyJson([str(ti.result) for ti in res])))
            for ti in res:
                print("--- result is error:", utils.isError(ti.result))
                # print("--- exmsg:", utils.exmsg(ti.result))
    
    
    if __name__ == "__main__":
        ins = Test_Async()
        ins.test_multi_thread()
    

猜你喜欢

转载自blog.csdn.net/yangxuan0261/article/details/108920658