(LTS) asyncio 探幽
python==3.12.4
内置模块 select / selectors
背景与引入
asyncio.run(main())
等价于下面:
with asyncio.Runner() as runner:
runner.run(main())
而这个 with 语法对应于 asyncio.Runner.__enter__
方法的主要执行逻辑为:
# self: asyncio.Runner
self._loop = events.new_event_loop()
events.set_event_loop(self._loop)
首先关注 events.new_event_loop()
,
events.new_event_loop() <=> get_event_loop_policy().new_event_loop()
# 在 Unix 上, DefaultEventLoopPolicy 指向 _UnixDefaultEventLoopPolicy, 因此
get_event_loop_policy() <=> _init_event_loop_policy() <=> DefaultEventLoopPolicy() <=> _UnixDefaultEventLoopPolicy()
# 继承关系如下:
_UnixDefaultEventLoopPolicy -> BaseDefaultEventLoopPolicy -> AbstractEventLoopPolicy
# 而 _UnixDefaultEventLoopPolicy 包含一个类属性: _loop_factory=_UnixSelectorEventLoop
new_event_loop() <=> _loop_factory() <=> _UnixSelectorEventLoop()
# 继承关系如下, 其中 AbstractEventLoop 定义了一堆抽象方法, 而 BaseEventLoop 里具体实现了很大部分的抽象方法
_UnixSelectorEventLoop -> base_events.BaseEventLoop -> events.AbstractEventLoop
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def __init__(self, selector=None):
super().__init__(selector)
self._signal_handlers = {}
class BaseSelectorEventLoop(base_events.BaseEventLoop):
def __init__(self, selector=None):
super().__init__() # 这里面也初始化了很多属性
if selector is None:
selector = selectors.DefaultSelector() # 在 unix 上, 也就是 selectors.EpollSelector
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
self._transports = weakref.WeakValueDictionary()
TODO: 下面这两段话可能要重写一下
selectors 是一个官方内置模块, 基于更底层的 select 模块(这个更底层的模块基本上就是对相应的 select 系统调用的直接封装). 由于一个程序可能会涉及到多个 IO 流 (例如连接多个 socket, 打开或写入文件/管道等), 在这种情况下, select 系统调用会告诉调用者, 这些 IO 流哪些已经准备好了(哪些流已经可读了,哪些流可写了,哪些流异常了)
要理解 select/selector 模块, 关键在于理解 select 和 epoll 这两个系统调用 (也可以去理解 C 语言中这两个函数的原型). 注意: select 是跨平台的, 但效率较低, 而 epoll 只适用于 Unix, 但效率高. 下面回到 python 环境下进行介绍:
select
select 模块主要就是如下方法/类, 本质上都是 I/O 多路复用 的系统调用:
select
方法: 跨平台: Linux, Windows, macOS (,BSD 等)poll
类: 跨平台: Linux, macOS (,BSD, Solaris 等)kqueue
类: macOS (, BSD 等)epoll
类: 仅支持 Linuxdevpool
类: 仅支持 Solaris
select 方法是跨平台的, 判断 rlist
中的文件描述符是否可读, wlist
中的文件描述符是否可写, exceptional
中的文件是否异常.
readable, writable, exceptional = select.select(rlist, wlist, xlist[, timeout])
一个使用 select 的例子
以下这个例子是服务端代码, 做的事情是: 服务端接收请求, 并打印客户端传来的数据, 然后关闭这个客户端连接
import select
import socket
# 创建一个服务器套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 12345))
server.listen()
inputs = [server] # 初始的可读集合,只包含服务器套接字
while True:
readable, writable, exceptional = select.select(inputs, [], [])
for s in readable:
if s is server:
# 服务器套接字可读,表示有新的客户端连接
client_socket, client_address = server.accept()
print(f"New connection from {client_address}")
inputs.append(client_socket) # 将新连接添加到可读集合
else:
# 客户端套接字可读,处理数据
data = s.recv(1024)
if data:
print(f"Received: {data.decode()}")
else:
# 客户端关闭连接
print(f"Client disconnected: {s.getpeername()}")
inputs.remove(s)
s.close()
TODO: 使用 epoll 的例子
TODO: 可以参考 https://chatgpt.com/share/dd157619-95e1-4b23-959c-6710c33e6307
在上层模块 selectors 中, poll, epoll, devpoll 对应的包装类 PollSelector
, EpollSelector
, DevpollSelector
都继承自 _PollLikeSelector
, 因此以 epoll 为例
selectors
继承关系如下:
[PollSelector, EpollSelector, DevpollSelector] -> _PollLikeSelector -> _BaseSelectorImpl -> BaseSelector
SelectSelector -> _BaseSelectorImpl -> BaseSelector
KqueueSelector -> _BaseSelectorImpl -> BaseSelector
其中 BaseSelector 是纯抽象类, _BaseSelectorImpl 包含了一些具体实现供其子类使用 super().xxx
来使用. 而在 selectors.py
的末尾, 按照如下次序根据自身平台设置 DefaultSelector
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
TODO: BaseSelector 的接口定义是什么含义?
因此 selectors 模块一般只需要使用到 selectors.DefaultSelector
, 以及它的 register
, unregister
和 select
方法.
一个使用 selectors 模块的例子:
selector.EVENT_READ=(1 << 0)
, selector.EVENT_WRITE=(1 << 1)
. 在使用 register
时, regsiter(fileobj, events, data=None)
, 其中 events
只能取值为 1, 2, 3, 分别代表需要监听 fileobj
这个文件对象什么时候可读,可写,可读或可写. 而 select()
方法会返回一个列表(这个列表代表的是被满足被监听条件的文件列表), 列表的每一项是个 tuple: (key, mask)
, 其中 key
是 selector.SelectorKey
类型, SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
. 而 mask
是指此时文件对象的状态, 注意 mask=3
代表的是相应的文件对象可读且可写.
import os
import selectors
sel = selectors.DefaultSelector()
# 创建一个管道,返回 (read_fd, write_fd)
read_fd, write_fd = os.pipe()
# 将读端和写端分别转换为文件对象
read_pipe = os.fdopen(read_fd, 'r')
write_pipe = os.fdopen(write_fd, 'w')
# 注册读端,监听可读或可写事件
# selectors.EVENT_READ | selectors.EVENT_WRITE = 3
sel.register(read_pipe, selectors.EVENT_READ | selectors.EVENT_WRITE)
# 写入一些数据,使得读端可读
write_pipe.write("Test data\n")
write_pipe.flush() # 确保数据写入管道
# 检测事件
events = sel.select(timeout=1)
for key, mask in events:
print(f"Registered events: {key.events}, Returned mask: {mask}")
if mask == (selectors.EVENT_READ | selectors.EVENT_WRITE):
print("The pipe is both readable and writable!")
elif mask & selectors.EVENT_READ:
print("The pipe is readable.") # 实际运行时, 打印的是这一行!
elif mask & selectors.EVENT_WRITE:
print("The pipe is writable.")
# 清理资源
sel.unregister(read_pipe)
read_pipe.close()
write_pipe.close()