python==3.12.4

内置模块 select / selectors

背景与引入

asyncio.run(main()) 等价于下面:

with asyncio.Runner() as runner:
    runner.run(main())

而这个 with 语法对应于 asyncio.Runner.__enter__ 方法的主要执行逻辑为:

# self: asyncio.Runner
self._loop = events.new_event_loop()
events.set_event_loop(self._loop)

首先关注 events.new_event_loop(),

events.new_event_loop() <=> get_event_loop_policy().new_event_loop()

# 在 Unix 上, DefaultEventLoopPolicy 指向 _UnixDefaultEventLoopPolicy, 因此
get_event_loop_policy() <=> _init_event_loop_policy() <=> DefaultEventLoopPolicy() <=> _UnixDefaultEventLoopPolicy()
# 继承关系如下:
_UnixDefaultEventLoopPolicy -> BaseDefaultEventLoopPolicy -> AbstractEventLoopPolicy
# 而 _UnixDefaultEventLoopPolicy 包含一个类属性: _loop_factory=_UnixSelectorEventLoop

new_event_loop() <=> _loop_factory() <=> _UnixSelectorEventLoop()
# 继承关系如下, 其中 AbstractEventLoop 定义了一堆抽象方法, 而 BaseEventLoop 里具体实现了很大部分的抽象方法
_UnixSelectorEventLoop -> base_events.BaseEventLoop -> events.AbstractEventLoop
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    def __init__(self, selector=None):
        super().__init__(selector)
        self._signal_handlers = {}

class BaseSelectorEventLoop(base_events.BaseEventLoop):
    def __init__(self, selector=None):
        super().__init__()  # 这里面也初始化了很多属性

        if selector is None:
            selector = selectors.DefaultSelector()   # 在 unix 上, 也就是 selectors.EpollSelector
        logger.debug('Using selector: %s', selector.__class__.__name__)
        self._selector = selector
        self._make_self_pipe()
        self._transports = weakref.WeakValueDictionary()

TODO: 下面这两段话可能要重写一下

selectors 是一个官方内置模块, 基于更底层的 select 模块(这个更底层的模块基本上就是对相应的 select 系统调用的直接封装). 由于一个程序可能会涉及到多个 IO 流 (例如连接多个 socket, 打开或写入文件/管道等), 在这种情况下, select 系统调用会告诉调用者, 这些 IO 流哪些已经准备好了(哪些流已经可读了,哪些流可写了,哪些流异常了)

要理解 select/selector 模块, 关键在于理解 select 和 epoll 这两个系统调用 (也可以去理解 C 语言中这两个函数的原型). 注意: select 是跨平台的, 但效率较低, 而 epoll 只适用于 Unix, 但效率高. 下面回到 python 环境下进行介绍:

select

select 模块主要就是如下方法/类, 本质上都是 I/O 多路复用 的系统调用:

  • select 方法: 跨平台: Linux, Windows, macOS (,BSD 等)
  • poll 类: 跨平台: Linux, macOS (,BSD, Solaris 等)
  • kqueue 类: macOS (, BSD 等)
  • epoll 类: 仅支持 Linux
  • devpool 类: 仅支持 Solaris

select 方法是跨平台的, 判断 rlist 中的文件描述符是否可读, wlist 中的文件描述符是否可写, exceptional 中的文件是否异常.

readable, writable, exceptional = select.select(rlist, wlist, xlist[, timeout])

一个使用 select 的例子

以下这个例子是服务端代码, 做的事情是: 服务端接收请求, 并打印客户端传来的数据, 然后关闭这个客户端连接

import select
import socket

# 创建一个服务器套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 12345))
server.listen()

inputs = [server]  # 初始的可读集合,只包含服务器套接字

while True:
    readable, writable, exceptional = select.select(inputs, [], [])
    
    for s in readable:
        if s is server:
            # 服务器套接字可读,表示有新的客户端连接
            client_socket, client_address = server.accept()
            print(f"New connection from {client_address}")
            inputs.append(client_socket)  # 将新连接添加到可读集合
        else:
            # 客户端套接字可读,处理数据
            data = s.recv(1024)
            if data:
                print(f"Received: {data.decode()}")
            else:
                # 客户端关闭连接
                print(f"Client disconnected: {s.getpeername()}")
                inputs.remove(s)
                s.close()

TODO: 使用 epoll 的例子

TODO: 可以参考 https://chatgpt.com/share/dd157619-95e1-4b23-959c-6710c33e6307

在上层模块 selectors 中, poll, epoll, devpoll 对应的包装类 PollSelector, EpollSelector, DevpollSelector 都继承自 _PollLikeSelector, 因此以 epoll 为例

selectors

继承关系如下:

[PollSelector, EpollSelector, DevpollSelector] -> _PollLikeSelector -> _BaseSelectorImpl -> BaseSelector
SelectSelector -> _BaseSelectorImpl -> BaseSelector
KqueueSelector -> _BaseSelectorImpl -> BaseSelector

其中 BaseSelector 是纯抽象类, _BaseSelectorImpl 包含了一些具体实现供其子类使用 super().xxx 来使用. 而在 selectors.py 的末尾, 按照如下次序根据自身平台设置 DefaultSelector

# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

TODO: BaseSelector 的接口定义是什么含义?

因此 selectors 模块一般只需要使用到 selectors.DefaultSelector, 以及它的 register, unregisterselect 方法.

一个使用 selectors 模块的例子:

selector.EVENT_READ=(1 << 0), selector.EVENT_WRITE=(1 << 1). 在使用 register 时, regsiter(fileobj, events, data=None), 其中 events 只能取值为 1, 2, 3, 分别代表需要监听 fileobj 这个文件对象什么时候可读,可写,可读或可写. 而 select() 方法会返回一个列表(这个列表代表的是被满足被监听条件的文件列表), 列表的每一项是个 tuple: (key, mask), 其中 keyselector.SelectorKey 类型, SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']). 而 mask 是指此时文件对象的状态, 注意 mask=3 代表的是相应的文件对象可读且可写.

import os
import selectors

sel = selectors.DefaultSelector()

# 创建一个管道,返回 (read_fd, write_fd)
read_fd, write_fd = os.pipe()

# 将读端和写端分别转换为文件对象
read_pipe = os.fdopen(read_fd, 'r')
write_pipe = os.fdopen(write_fd, 'w')

# 注册读端,监听可读或可写事件
# selectors.EVENT_READ | selectors.EVENT_WRITE = 3
sel.register(read_pipe, selectors.EVENT_READ | selectors.EVENT_WRITE)

# 写入一些数据,使得读端可读
write_pipe.write("Test data\n")
write_pipe.flush()  # 确保数据写入管道

# 检测事件
events = sel.select(timeout=1)
for key, mask in events:
    print(f"Registered events: {key.events}, Returned mask: {mask}")
    if mask == (selectors.EVENT_READ | selectors.EVENT_WRITE):
        print("The pipe is both readable and writable!")
    elif mask & selectors.EVENT_READ:
        print("The pipe is readable.")   # 实际运行时, 打印的是这一行!
    elif mask & selectors.EVENT_WRITE:
        print("The pipe is writable.")

# 清理资源
sel.unregister(read_pipe)
read_pipe.close()
write_pipe.close()