python:并发编程(十)

前言

本文将和大家一起探讨python的多协程并发编程(上篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。

本文为python并发编程的第十篇,上一篇文章地址如下:

python:并发编程(九)_Lion King的博客-CSDN博客

下一篇文章地址如下:

python:并发编程(十一)_Lion King的博客-CSDN博客

一、快速开始

官方文档:asyncio --- 异步 I/O — Python 3.11.4 文档

1、事件循环

asyncio 提供了事件循环(Event Loop)作为协程的调度器和执行者。事件循环负责处理协程的调度和执行,并处理事件、回调函数等。以下是关于 asyncio 事件循环的一些重要概念和用法:

(1)获取事件循环对象:可以使用 asyncio.get_event_loop() 获取默认的事件循环对象,或者使用 asyncio.new_event_loop() 创建一个新的事件循环对象。

(2)设置默认事件循环:可以使用 asyncio.set_event_loop() 设置默认的事件循环对象。

(3)运行事件循环:可以使用 loop.run_forever() 方法以无限循环的方式运行事件循环,直到调用 loop.stop()

(4)执行协程:可以使用 loop.run_until_complete() 方法执行一个协程或任务,并等待其完成。

(5)停止事件循环:可以使用 loop.stop() 停止事件循环的运行。

(6)调度协程:可以使用 loop.create_task() 方法创建一个任务,并将其添加到事件循环中运行。

(7)定时调度:可以使用 loop.call_later()loop.call_at() 方法在指定的时间点调度回调函数的执行。

(8)异常处理:可以使用 try-except 块来捕获和处理在协程执行过程中抛出的异常。

以下是一个简单的示例代码,演示了如何使用 asyncio 的事件循环:

import asyncio

# 定义一个协程
async def my_coroutine():
    print("Coroutine is running")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("Coroutine is done")

# 创建事件循环
loop = asyncio.get_event_loop()

# 将协程添加到事件循环中
task = loop.create_task(my_coroutine())

# 运行事件循环直到任务完成
loop.run_until_complete(task)

# 关闭事件循环
loop.close()

以上代码创建了一个简单的协程 my_coroutine(),并将其添加到事件循环中运行。loop.run_until_complete(task) 运行事件循环直到任务完成,然后关闭事件循环。

通过使用 asyncio 的事件循环,可以方便地管理和调度协程的执行,实现异步操作和并发编程。

2、Futures

asyncio 中,Futures 是一种用于表示异步操作结果的对象。它充当了协程和异步函数之间的桥梁,可以用于等待和获取异步操作的结果。

Futures 提供了以下主要功能:

(1)表示异步操作的结果:Future 对象表示一个尚未完成的异步操作,可以在协程中使用它来等待操作的完成,并获取最终的结果。

(2)设置结果值:通过调用 Future 对象的 set_result() 方法,可以设置异步操作的结果值。

(3)获取结果值:可以使用 await 关键字或 yield from 表达式等待 Future 对象的完成,并获取操作的结果值。

(4)异常处理:Future 对象可以捕获和传播异步操作中抛出的异常。可以使用 try-except 块来捕获异常,并使用 set_exception() 方法将异常传播给等待的协程。

以下是一个简单的示例代码,演示了如何使用 Futures

import asyncio

# 定义一个异步函数
async def my_async_function():
    await asyncio.sleep(1)
    return "Async operation completed"

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建一个 Future 对象
future = asyncio.ensure_future(my_async_function())

# 执行事件循环,等待 Future 完成
loop.run_until_complete(future)

# 获取 Future 的结果值
result = future.result()
print(result)

# 关闭事件循环
loop.close()

在上述示例中,通过调用 asyncio.ensure_future() 方法,将异步函数 my_async_function() 转换为 Future 对象。然后,使用 loop.run_until_complete() 方法等待 Future 对象的完成,并获取结果值。

通过使用 Futures,可以方便地进行异步操作的等待和结果处理,实现并发编程和异步任务的管理。

3、传输和协议

服务端

运行以下代码将启动一个简单的服务端,监听本地的8888端口。当客户端连接并发送数据时,服务端将接收数据并发送回一条响应。请确保在运行服务器之前,没有其他应用程序在使用相同的IP地址和端口号,并且防火墙设置不会阻止客户端与服务器建立连接。

import asyncio

# 自定义协议类
class MyProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        print('Client connected')
        self.transport = transport

    def data_received(self, data):
        print('Received:', data.decode())

        # 响应客户端请求
        response = b'Hello, client!'
        self.transport.write(response)

    def connection_lost(self, exc):
        print('Client disconnected')

# 创建事件循环
loop = asyncio.get_event_loop()

# 启动服务端
async def start_server():
    # 创建服务器
    server = await loop.create_server(lambda: MyProtocol(), 'localhost', 8888)

    # 获取绑定的地址和端口
    address = server.sockets[0].getsockname()
    print('Server started at', address)

    # 等待服务器关闭
    await server.wait_closed()

# 运行事件循环,启动服务端
loop.run_until_complete(start_server())

客户端

asyncio 中,传输(Transport)和协议(Protocol)是用于实现网络通信的重要组件。

**传输(Transport)**是网络连接的抽象表示,负责发送和接收数据。它提供了发送和关闭连接的方法,以及处理连接状态的相关属性。

**协议(Protocol)**是定义网络通信规则和行为的抽象接口。它定义了在网络连接上进行数据交换的方式,包括数据的编码、解码和处理等。协议通常是从 asyncio.Protocol 类派生而来的子类。

asyncio 中,可以通过以下步骤来使用传输和协议进行网络通信:

(1)创建传输和协议对象:使用 loop.create_connection() 方法创建传输和协议对象。该方法接受一个协议类和主机地址、端口号等参数,并返回一个 (transport, protocol) 对象。

(2)连接到远程主机:调用传输对象的 transport.connect() 方法,将传输对象与远程主机建立连接。连接成功后,将调用协议对象的 protocol.connection_made() 方法。

(3)数据收发:使用传输对象的 transport.write() 方法发送数据到远程主机,以及通过协议对象的回调方法接收和处理远程主机发送的数据。

(4)关闭连接:调用传输对象的 transport.close() 方法关闭连接。关闭连接后,将调用协议对象的 protocol.connection_lost() 方法。

以下是一个简单的示例代码,演示了如何使用传输和协议进行网络通信:

import asyncio

# 自定义协议类
class MyProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        print('Connected')
        self.transport = transport

    def data_received(self, data):
        print('Received:', data.decode())

    def connection_lost(self, exc):
        print('Connection closed')

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建传输和协议对象
coro = loop.create_connection(MyProtocol, 'localhost', 8888)

# 运行事件循环,建立连接
transport, protocol = loop.run_until_complete(coro)

# 发送数据
transport.write(b'Hello, server!')

# 关闭连接
transport.close()

# 停止事件循环
loop.stop()

在上述示例中,我们自定义了一个协议类 MyProtocol,继承自 asyncio.Protocol。然后,使用 loop.create_connection() 方法创建传输和协议对象,并通过 loop.run_until_complete() 方法建立连接。最后,通过传输对象的 transport.write() 方法发送数据,并调用传输对象的 transport.close() 方法关闭连接。

通过使用传输和协议,我们可以方便地实现网络通信,并处理收发数据的逻辑。

4、策略

asyncio中,策略对象是用于控制事件循环行为的一种机制。asyncio提供了默认的策略对象,但也允许用户自定义和替换策略对象来满足特定的需求。

默认的策略对象是asyncio.DefaultEventLoopPolicy,它基于具体的事件循环实现,如asyncio.SelectorEventLoop(基于selectors模块)或asyncio.SelectorEventLoop(基于select模块)。默认策略对象通过asyncio.get_event_loop_policy()方法获取。

用户可以通过自定义策略对象来改变事件循环的行为,例如使用特定的事件循环实现或者自定义调度器。自定义策略对象必须实现asyncio.AbstractEventLoopPolicy接口的方法,例如get_event_loop()set_event_loop()new_event_loop()等。

下面是一个使用自定义策略对象的示例:

import asyncio

# 自定义策略对象
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def get_event_loop(self):
        loop = super().get_event_loop()
        print('Using custom event loop:', type(loop).__name__)
        return loop

    # 其他自定义方法...

# 设置自定义策略对象
asyncio.set_event_loop_policy(MyEventLoopPolicy())

# 创建事件循环并运行
loop = asyncio.get_event_loop()
loop.run_forever()

在这个示例中,我们创建了一个自定义的策略对象MyEventLoopPolicy,并重写了get_event_loop()方法来打印使用的事件循环类型。然后通过asyncio.set_event_loop_policy()方法将自定义策略对象设置为当前的策略。最后,使用asyncio.get_event_loop()获取事件循环并运行。

请注意,自定义策略对象的设置应该在创建事件循环之前进行,以确保新创建的事件循环使用自定义策略。

5、Extending

asyncio提供了扩展其功能的机制,使开发者能够根据自己的需求进行定制和扩展。以下是一些扩展asyncio功能的常见方法:

(1)自定义协议(Protocol):可以通过继承asyncio.Protocol类来创建自定义的协议。通过实现connection_madedata_receivedconnection_lost等方法,可以处理连接的建立、数据的接收和连接的关闭等事件。

(2)自定义传输(Transport):可以通过继承asyncio.BaseTransport类来创建自定义的传输。传输是协议和网络层之间的接口,负责处理数据的发送和接收。

(3)自定义事件循环(Event Loop):可以通过继承asyncio.AbstractEventLoop类来创建自定义的事件循环。事件循环是asyncio的核心组件,负责调度和执行异步任务。

(4)自定义异步任务(Coroutines):可以使用asyncio.coroutine装饰器和async def关键字来定义自己的异步任务。通过使用协程,可以编写异步的、非阻塞的代码逻辑。

(5)自定义异步函数(Async Functions):可以使用@asyncio.coroutine装饰器和async def关键字来定义自己的异步函数。异步函数可以与协程配合使用,用于编写更高级的异步逻辑。

(6)自定义事件处理器(Event Handlers):可以使用asyncio提供的事件处理器机制来定制和处理特定的事件。例如,可以注册和处理定时器事件、I/O事件、信号事件等。

通过以上扩展方法,可以根据具体需求对asyncio进行定制和扩展,以满足特定的异步编程场景和功能要求。这使得asyncio具有很高的灵活性和可扩展性,适用于各种异步编程任务和应用领域。

下面是一个简单的示例代码,展示了如何扩展asyncio的功能,通过自定义协议和传输来实现一个简单的Echo服务器:

import asyncio

# 自定义协议类
class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print("Received:", message)
        self.transport.write(data)

    def connection_lost(self, exc):
        print("Connection closed")

# 自定义传输类
class EchoTransport(asyncio.Transport):
    def __init__(self, loop):
        super().__init__(extra=None)
        self.loop = loop
        self.buffer = b''

    def write(self, data):
        print("Sending:", data.decode())
        self.buffer += data

    def close(self):
        print("Closing connection")

    def can_write_eof(self):
        return False

    def get_write_buffer_size(self):
        return len(self.buffer)

    def write_eof(self):
        pass

    def abort(self):
        pass

    def pause_reading(self):
        pass

    def resume_reading(self):
        pass

# 创建事件循环
loop = asyncio.get_event_loop()

# 创建协议和传输对象
protocol = EchoProtocol()
transport = EchoTransport(loop)

# 将协议和传输对象关联起来
protocol.connection_made(transport)

# 模拟收到数据并处理
data = b"Hello, server!"
protocol.data_received(data)

# 关闭连接
protocol.connection_lost(None)

# 停止事件循环
loop.stop()

这个示例中,EchoProtocol类继承了asyncio.Protocol,并实现了connection_madedata_receivedconnection_lost等方法,用于处理连接的建立、数据的接收和连接的关闭。EchoTransport类继承了asyncio.Transport,并实现了一系列传输相关的方法,用于处理数据的发送和关闭连接。

通过自定义协议和传输,我们可以根据需要来扩展和定制asyncio的功能,以适应不同的应用场景和需求。

猜你喜欢

转载自blog.csdn.net/weixin_43431593/article/details/131241935