python==3.12.4

内置模块 select / selectors

背景与引入

asyncio.run(main()) 等价于下面:

with asyncio.Runner() as runner:
    runner.run(main())

而这个 with 语法对应于 asyncio.Runner.__enter__ 方法的主要执行逻辑为:

# self: asyncio.Runner
self._loop = events.new_event_loop()
events.set_event_loop(self._loop)

首先关注 events.new_event_loop(),

events.new_event_loop() <=> get_event_loop_policy().new_event_loop()

# 在 Unix 上, DefaultEventLoopPolicy 指向 _UnixDefaultEventLoopPolicy, 因此
get_event_loop_policy() <=> _init_event_loop_policy() <=> DefaultEventLoopPolicy() <=> _UnixDefaultEventLoopPolicy()
# 继承关系如下:
_UnixDefaultEventLoopPolicy -> BaseDefaultEventLoopPolicy -> AbstractEventLoopPolicy
# 而 _UnixDefaultEventLoopPolicy 包含一个类属性: _loop_factory=_UnixSelectorEventLoop

new_event_loop() <=> _loop_factory() <=> _UnixSelectorEventLoop()
# 继承关系如下, 其中 AbstractEventLoop 定义了一堆抽象方法, 而 BaseEventLoop 里具体实现了很大部分的抽象方法
_UnixSelectorEventLoop -> base_events.BaseEventLoop -> events.AbstractEventLoop
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    def __init__(self, selector=None):
        super().__init__(selector)
        self._signal_handlers = {}

class BaseSelectorEventLoop(base_events.BaseEventLoop):
    def __init__(self, selector=None):
        super().__init__()  # 这里面也初始化了很多属性

        if selector is None:
            selector = selectors.DefaultSelector()   # 在 unix 上, 也就是 selectors.EpollSelector
        logger.debug('Using selector: %s', selector.__class__.__name__)
        self._selector = selector
        self._make_self_pipe()
        self._transports = weakref.WeakValueDictionary()

TODO: 下面这两段话可能要重写一下

selectors 是一个官方内置模块, 基于更底层的 select 模块(这个更底层的模块基本上就是对相应的 select 系统调用的直接封装). 由于一个程序可能会涉及到多个 IO 流 (例如连接多个 socket, 打开或写入文件/管道等), 在这种情况下, select 系统调用会告诉调用者, 这些 IO 流哪些已经准备好了(哪些流已经可读了,哪些流可写了,哪些流异常了)

要理解 select/selector 模块, 关键在于理解 select 和 epoll 这两个系统调用 (也可以去理解 C 语言中这两个函数的原型). 注意: select 是跨平台的, 但效率较低, 而 epoll 只适用于 Unix, 但效率高. 下面回到 python 环境下进行介绍:

select

select 模块主要就是如下方法/类, 本质上都是 I/O 多路复用 的系统调用:

  • select 方法: 跨平台: Linux, Windows, macOS (,BSD 等)
  • poll 类: 跨平台: Linux, macOS (,BSD, Solaris 等)
  • kqueue 类: macOS (, BSD 等)
  • epoll 类: 仅支持 Linux
  • devpool 类: 仅支持 Solaris

select 方法是跨平台的, 判断 rlist 中的文件描述符是否可读, wlist 中的文件描述符是否可写, exceptional 中的文件是否异常.

readable, writable, exceptional = select.select(rlist, wlist, xlist[, timeout])

一个使用 select 的例子

以下这个例子是服务端代码, 做的事情是: 服务端接收请求, 并打印客户端传来的数据, 然后关闭这个客户端连接

import select
import socket

# 创建一个服务器套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 12345))
server.listen()

inputs = [server]  # 初始的可读集合,只包含服务器套接字

while True:
    readable, writable, exceptional = select.select(inputs, [], [])
    
    for s in readable:
        if s is server:
            # 服务器套接字可读,表示有新的客户端连接
            client_socket, client_address = server.accept()
            print(f"New connection from {client_address}")
            inputs.append(client_socket)  # 将新连接添加到可读集合
        else:
            # 客户端套接字可读,处理数据
            data = s.recv(1024)
            if data:
                print(f"Received: {data.decode()}")
            else:
                # 客户端关闭连接
                print(f"Client disconnected: {s.getpeername()}")
                inputs.remove(s)
                s.close()

TODO: 使用 epoll 的例子

TODO: 可以参考 https://chatgpt.com/share/dd157619-95e1-4b23-959c-6710c33e6307

在上层模块 selectors 中, poll, epoll, devpoll 对应的包装类 PollSelector, EpollSelector, DevpollSelector 都继承自 _PollLikeSelector, 因此以 epoll 为例

selectors

继承关系如下:

[PollSelector, EpollSelector, DevpollSelector] -> _PollLikeSelector -> _BaseSelectorImpl -> BaseSelector
SelectSelector -> _BaseSelectorImpl -> BaseSelector
KqueueSelector -> _BaseSelectorImpl -> BaseSelector

其中 BaseSelector 是纯抽象类, _BaseSelectorImpl 包含了一些具体实现供其子类使用 super().xxx 来使用. 而在 selectors.py 的末尾, 按照如下次序根据自身平台设置 DefaultSelector

# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

TODO: BaseSelector 的接口定义是什么含义?

因此 selectors 模块一般只需要使用到 selectors.DefaultSelector, 以及它的 register, unregisterselect 方法.

一个使用 selectors 模块的例子:

selector.EVENT_READ=(1 << 0), selector.EVENT_WRITE=(1 << 1). 在使用 register 时, regsiter(fileobj, events, data=None), 其中 events 只能取值为 1, 2, 3, 分别代表需要监听 fileobj 这个文件对象什么时候可读,可写,可读或可写. 而 select() 方法会返回一个列表(这个列表代表的是被满足被监听条件的文件列表), 列表的每一项是个 tuple: (key, mask), 其中 keyselector.SelectorKey 类型, SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']). 而 mask 是指此时文件对象的状态, 注意 mask=3 代表的是相应的文件对象可读且可写.

import os
import selectors

sel = selectors.DefaultSelector()

# 创建一个管道,返回 (read_fd, write_fd)
read_fd, write_fd = os.pipe()

# 将读端和写端分别转换为文件对象
read_pipe = os.fdopen(read_fd, 'r')
write_pipe = os.fdopen(write_fd, 'w')

# 注册读端,监听可读或可写事件
# selectors.EVENT_READ | selectors.EVENT_WRITE = 3
sel.register(read_pipe, selectors.EVENT_READ | selectors.EVENT_WRITE)

# 写入一些数据,使得读端可读
write_pipe.write("Test data\n")
write_pipe.flush()  # 确保数据写入管道

# 检测事件
events = sel.select(timeout=1)
for key, mask in events:
    print(f"Registered events: {key.events}, Returned mask: {mask}")
    if mask == (selectors.EVENT_READ | selectors.EVENT_WRITE):
        print("The pipe is both readable and writable!")
    elif mask & selectors.EVENT_READ:
        print("The pipe is readable.")   # 实际运行时, 打印的是这一行!
    elif mask & selectors.EVENT_WRITE:
        print("The pipe is writable.")

# 清理资源
sel.unregister(read_pipe)
read_pipe.close()
write_pipe.close()

Task 代码阅读

引入:

import asyncio
async def amain():
    loop = asyncio.get_running_loop()
    print(id(loop), loop)
    print("1234")

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    future = loop.create_task(amain())
    loop.run_until_complete(future)

这里的 create_task 发生了什么?

python 实现在 asyncio.tasks.Task, 但代码里包含这段:

# asyncio/tasks.py
# class Task(...)
#     ...

_PyTask = Task
try:
    import _asyncio
except ImportError:
    pass
else:
    # _CTask is needed for tests.
    Task = _CTask = _asyncio.Task

因此默认会用 C 的实现

注册如下:

static PyType_Spec Task_spec = {
    .name = "_asyncio.Task",
    .basicsize = sizeof(TaskObj),
    .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE |
              Py_TPFLAGS_IMMUTABLETYPE),
    .slots = Task_slots,
};

loop

关键点在于 loop.run_forever -> loop._run_one

使用指南

获取事件循环 loop

每个线程同时只能有一个事件循环, 但可以切换. 例如一个 python 程序开了一个子进程, 每个子进程由开了两个子线程, 那么同时可以有 6 个事件循环: 主进程的主线程, 主进程的2个子线程, 子进程的主线程, 子进程的2个子线程.

与事件循环创建相关的 API 有如下:

  • asyncio.events._get_running_loop: 这个是内部 API, 如果当前有事件循环在运行, 就返回它, 否则返回 None
  • asyncio.get_running_loop: 对外 API, 如果当前有事件循环在运行, 就返回它, 否则报错: RuntimeError
  • asyncio.new_event_loop: 对外 API, 创建一个事件循环
  • asyncio.set_event_loop: 对外 API, 设置事件循环
  • asyncio.get_event_loop: 对外 API, 现已不建议使用. 如果当前有事件循环在运行, 就返回它, 否则创建一个(但创建只适用于主线程, 子线程需要手动用 asyncio.new_event_loopasyncio.set_event_loop)

例子

import asyncio
async def amain():
    loop = asyncio.get_running_loop()
    print(id(loop), loop)
    print("1234")

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    print(id(loop), loop)
    asyncio.set_event_loop(loop)
    print(id(loop), loop)
    future = loop.create_task(amain())
    # loop.run_forever()
    loop.run_until_complete(future)

# 输出
# 140455527134752 <_UnixSelectorEventLoop running=False closed=False debug=False>
# 140455527134752 <_UnixSelectorEventLoop running=False closed=False debug=False>
# 140455527134752 <_UnixSelectorEventLoop running=True closed=False debug=False>
# 1234

注意到只有通过 loop.run_until_completeloop.run_forever 后, loop 才会变成运行状态

loop.run_in_executor

例子1

import asyncio
import time

def foo(x):
    time.sleep(x)
    return x

async def main():
    ns = [1, 2, 3, 4]
    t1 = time.time()
    
    # 方式 A:
    # 默认的 executor 是 concurrent.futures.ThreadPoolExecutor
    # loop = asyncio.get_running_loop()
    # tasks = [loop.run_in_executor(None, foo, n) for n in ns]
    # results = await asyncio.gather(*tasks)

    # 方式 B:
    # 使用默认的线程池时, 建议使用下面的新用法: asyncio.to_thread
    tasks = [asyncio.to_thread(foo, n) for n in ns]
    results = await asyncio.gather(*tasks)

    t2 = time.time()
    print(f"time: {t2-t1}, results: {results}")

if __name__ == "__main__":
    asyncio.run(main())

# time: 4.002139329910278, results: [1, 2, 3, 4]

注意, 如果 executor 想使用进程池: concurrent.futures.ProcessPoolExecutor, 则只能用方式A

例子2

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def foo(x):
    time.sleep(x)
    return x

async def afoo(x):
    await asyncio.sleep(x)
    return x

async def main():
    ns = [1, 2, 3, 4]
    atasks = [asyncio.create_task(afoo(n)) for n in ns]

    # 方式 A: 需要 4s
    # 同时等待普通的协程任务以及普通的IO堵塞任务
    tasks = [asyncio.to_thread(foo, n) for n in ns]
    results = await asyncio.gather(*tasks, *atasks)

    # 方式 B: 需要 8s
    # 直接使用线程池, 只能先卡住事件循环
    # with ThreadPoolExecutor(max_workers=4) as executor:
    #     futures = [executor.submit(foo, n) for n in ns]
    #     results = [future.result() for future in futures]
    # aresults = await asyncio.gather(*atasks)
    # results = results + aresults

    print(f"results: {results}")

if __name__ == "__main__":
    t1 = time.time()
    asyncio.run(main())
    t2 = time.time()
    print(f"time: {t2 - t1}")

MISC (TODO)

TODO: (1) 线程安全的准确定义: Linearizability (2) event loop 是怎么被执行的, task 是怎么被注册的 (3) loop.call_soon 与 loop.call_soon_threadsafe 的区别 (4) 以 _UnixSelectorEventLoop 为例, 似乎关键在于 run_forever 或者说 _run_once (在 BaseEventLoop 中实现)

TODO: 下面这段已经被重复了

在 Linux 下: asyncio.new_event_loop 实际上是 get_event_loop_policy().new_event_loop(), 而 get_event_loop_policy() 实际上是 DefaultEventLoopPolicy(), 而 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy, _UnixDefaultEventLoopPolicy 的类属性 _loop_factory=_UnixSelectorEventLoop, 最终:

asyncio.new_event_loop()
# 等价于
_UnixSelectorEventLoop()

_UnixSelectorEventLoop 的继承关系是:

aynscio.events.AbstractEventLoop  # 纯粹是抽象类, 全部都是 raise NotImplementedError 的抽象方法

aynscio.base_events.BaseEventLoop

aynscio.selector_events.BaseSelectorEventLoop

aynscio.unix_events._UnixSelectorEventLoop