(LTS) asyncio 探幽
python==3.12.4
内置模块 select / selectors
背景与引入
asyncio.run(main())
等价于下面:
with asyncio.Runner() as runner:
runner.run(main())
而这个 with 语法对应于 asyncio.Runner.__enter__
方法的主要执行逻辑为:
# self: asyncio.Runner
self._loop = events.new_event_loop()
events.set_event_loop(self._loop)
首先关注 events.new_event_loop()
,
events.new_event_loop() <=> get_event_loop_policy().new_event_loop()
# 在 Unix 上, DefaultEventLoopPolicy 指向 _UnixDefaultEventLoopPolicy, 因此
get_event_loop_policy() <=> _init_event_loop_policy() <=> DefaultEventLoopPolicy() <=> _UnixDefaultEventLoopPolicy()
# 继承关系如下:
_UnixDefaultEventLoopPolicy -> BaseDefaultEventLoopPolicy -> AbstractEventLoopPolicy
# 而 _UnixDefaultEventLoopPolicy 包含一个类属性: _loop_factory=_UnixSelectorEventLoop
new_event_loop() <=> _loop_factory() <=> _UnixSelectorEventLoop()
# 继承关系如下, 其中 AbstractEventLoop 定义了一堆抽象方法, 而 BaseEventLoop 里具体实现了很大部分的抽象方法
_UnixSelectorEventLoop -> base_events.BaseEventLoop -> events.AbstractEventLoop
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def __init__(self, selector=None):
super().__init__(selector)
self._signal_handlers = {}
class BaseSelectorEventLoop(base_events.BaseEventLoop):
def __init__(self, selector=None):
super().__init__() # 这里面也初始化了很多属性
if selector is None:
selector = selectors.DefaultSelector() # 在 unix 上, 也就是 selectors.EpollSelector
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
self._transports = weakref.WeakValueDictionary()
TODO: 下面这两段话可能要重写一下
selectors 是一个官方内置模块, 基于更底层的 select 模块(这个更底层的模块基本上就是对相应的 select 系统调用的直接封装). 由于一个程序可能会涉及到多个 IO 流 (例如连接多个 socket, 打开或写入文件/管道等), 在这种情况下, select 系统调用会告诉调用者, 这些 IO 流哪些已经准备好了(哪些流已经可读了,哪些流可写了,哪些流异常了)
要理解 select/selector 模块, 关键在于理解 select 和 epoll 这两个系统调用 (也可以去理解 C 语言中这两个函数的原型). 注意: select 是跨平台的, 但效率较低, 而 epoll 只适用于 Unix, 但效率高. 下面回到 python 环境下进行介绍:
select
select 模块主要就是如下方法/类, 本质上都是 I/O 多路复用 的系统调用:
select
方法: 跨平台: Linux, Windows, macOS (,BSD 等)poll
类: 跨平台: Linux, macOS (,BSD, Solaris 等)kqueue
类: macOS (, BSD 等)epoll
类: 仅支持 Linuxdevpool
类: 仅支持 Solaris
select 方法是跨平台的, 判断 rlist
中的文件描述符是否可读, wlist
中的文件描述符是否可写, exceptional
中的文件是否异常.
readable, writable, exceptional = select.select(rlist, wlist, xlist[, timeout])
一个使用 select 的例子
以下这个例子是服务端代码, 做的事情是: 服务端接收请求, 并打印客户端传来的数据, 然后关闭这个客户端连接
import select
import socket
# 创建一个服务器套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 12345))
server.listen()
inputs = [server] # 初始的可读集合,只包含服务器套接字
while True:
readable, writable, exceptional = select.select(inputs, [], [])
for s in readable:
if s is server:
# 服务器套接字可读,表示有新的客户端连接
client_socket, client_address = server.accept()
print(f"New connection from {client_address}")
inputs.append(client_socket) # 将新连接添加到可读集合
else:
# 客户端套接字可读,处理数据
data = s.recv(1024)
if data:
print(f"Received: {data.decode()}")
else:
# 客户端关闭连接
print(f"Client disconnected: {s.getpeername()}")
inputs.remove(s)
s.close()
TODO: 使用 epoll 的例子
TODO: 可以参考 https://chatgpt.com/share/dd157619-95e1-4b23-959c-6710c33e6307
在上层模块 selectors 中, poll, epoll, devpoll 对应的包装类 PollSelector
, EpollSelector
, DevpollSelector
都继承自 _PollLikeSelector
, 因此以 epoll 为例
selectors
继承关系如下:
[PollSelector, EpollSelector, DevpollSelector] -> _PollLikeSelector -> _BaseSelectorImpl -> BaseSelector
SelectSelector -> _BaseSelectorImpl -> BaseSelector
KqueueSelector -> _BaseSelectorImpl -> BaseSelector
其中 BaseSelector 是纯抽象类, _BaseSelectorImpl 包含了一些具体实现供其子类使用 super().xxx
来使用. 而在 selectors.py
的末尾, 按照如下次序根据自身平台设置 DefaultSelector
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
TODO: BaseSelector 的接口定义是什么含义?
因此 selectors 模块一般只需要使用到 selectors.DefaultSelector
, 以及它的 register
, unregister
和 select
方法.
一个使用 selectors 模块的例子:
selector.EVENT_READ=(1 << 0)
, selector.EVENT_WRITE=(1 << 1)
. 在使用 register
时, regsiter(fileobj, events, data=None)
, 其中 events
只能取值为 1, 2, 3, 分别代表需要监听 fileobj
这个文件对象什么时候可读,可写,可读或可写. 而 select()
方法会返回一个列表(这个列表代表的是被满足被监听条件的文件列表), 列表的每一项是个 tuple: (key, mask)
, 其中 key
是 selector.SelectorKey
类型, SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
. 而 mask
是指此时文件对象的状态, 注意 mask=3
代表的是相应的文件对象可读且可写.
import os
import selectors
sel = selectors.DefaultSelector()
# 创建一个管道,返回 (read_fd, write_fd)
read_fd, write_fd = os.pipe()
# 将读端和写端分别转换为文件对象
read_pipe = os.fdopen(read_fd, 'r')
write_pipe = os.fdopen(write_fd, 'w')
# 注册读端,监听可读或可写事件
# selectors.EVENT_READ | selectors.EVENT_WRITE = 3
sel.register(read_pipe, selectors.EVENT_READ | selectors.EVENT_WRITE)
# 写入一些数据,使得读端可读
write_pipe.write("Test data\n")
write_pipe.flush() # 确保数据写入管道
# 检测事件
events = sel.select(timeout=1)
for key, mask in events:
print(f"Registered events: {key.events}, Returned mask: {mask}")
if mask == (selectors.EVENT_READ | selectors.EVENT_WRITE):
print("The pipe is both readable and writable!")
elif mask & selectors.EVENT_READ:
print("The pipe is readable.") # 实际运行时, 打印的是这一行!
elif mask & selectors.EVENT_WRITE:
print("The pipe is writable.")
# 清理资源
sel.unregister(read_pipe)
read_pipe.close()
write_pipe.close()
Task 代码阅读
引入:
import asyncio
async def amain():
loop = asyncio.get_running_loop()
print(id(loop), loop)
print("1234")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
future = loop.create_task(amain())
loop.run_until_complete(future)
这里的 create_task 发生了什么?
python 实现在 asyncio.tasks.Task
, 但代码里包含这段:
# asyncio/tasks.py
# class Task(...)
# ...
_PyTask = Task
try:
import _asyncio
except ImportError:
pass
else:
# _CTask is needed for tests.
Task = _CTask = _asyncio.Task
因此默认会用 C 的实现
注册如下:
static PyType_Spec Task_spec = {
.name = "_asyncio.Task",
.basicsize = sizeof(TaskObj),
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_IMMUTABLETYPE),
.slots = Task_slots,
};
loop
关键点在于 loop.run_forever -> loop._run_one
使用指南
获取事件循环 loop
每个线程同时只能有一个事件循环, 但可以切换. 例如一个 python 程序开了一个子进程, 每个子进程由开了两个子线程, 那么同时可以有 6 个事件循环: 主进程的主线程, 主进程的2个子线程, 子进程的主线程, 子进程的2个子线程.
与事件循环创建相关的 API 有如下:
asyncio.events._get_running_loop
: 这个是内部 API, 如果当前有事件循环在运行, 就返回它, 否则返回 Noneasyncio.get_running_loop
: 对外 API, 如果当前有事件循环在运行, 就返回它, 否则报错: RuntimeErrorasyncio.new_event_loop
: 对外 API, 创建一个事件循环asyncio.set_event_loop
: 对外 API, 设置事件循环asyncio.get_event_loop
: 对外 API, 现已不建议使用. 如果当前有事件循环在运行, 就返回它, 否则创建一个(但创建只适用于主线程, 子线程需要手动用asyncio.new_event_loop
和asyncio.set_event_loop
)
例子
import asyncio
async def amain():
loop = asyncio.get_running_loop()
print(id(loop), loop)
print("1234")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
print(id(loop), loop)
asyncio.set_event_loop(loop)
print(id(loop), loop)
future = loop.create_task(amain())
# loop.run_forever()
loop.run_until_complete(future)
# 输出
# 140455527134752 <_UnixSelectorEventLoop running=False closed=False debug=False>
# 140455527134752 <_UnixSelectorEventLoop running=False closed=False debug=False>
# 140455527134752 <_UnixSelectorEventLoop running=True closed=False debug=False>
# 1234
注意到只有通过 loop.run_until_complete
或 loop.run_forever
后, loop 才会变成运行状态
loop.run_in_executor
例子1
import asyncio
import time
def foo(x):
time.sleep(x)
return x
async def main():
ns = [1, 2, 3, 4]
t1 = time.time()
# 方式 A:
# 默认的 executor 是 concurrent.futures.ThreadPoolExecutor
# loop = asyncio.get_running_loop()
# tasks = [loop.run_in_executor(None, foo, n) for n in ns]
# results = await asyncio.gather(*tasks)
# 方式 B:
# 使用默认的线程池时, 建议使用下面的新用法: asyncio.to_thread
tasks = [asyncio.to_thread(foo, n) for n in ns]
results = await asyncio.gather(*tasks)
t2 = time.time()
print(f"time: {t2-t1}, results: {results}")
if __name__ == "__main__":
asyncio.run(main())
# time: 4.002139329910278, results: [1, 2, 3, 4]
注意, 如果 executor 想使用进程池: concurrent.futures.ProcessPoolExecutor
, 则只能用方式A
例子2
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def foo(x):
time.sleep(x)
return x
async def afoo(x):
await asyncio.sleep(x)
return x
async def main():
ns = [1, 2, 3, 4]
atasks = [asyncio.create_task(afoo(n)) for n in ns]
# 方式 A: 需要 4s
# 同时等待普通的协程任务以及普通的IO堵塞任务
tasks = [asyncio.to_thread(foo, n) for n in ns]
results = await asyncio.gather(*tasks, *atasks)
# 方式 B: 需要 8s
# 直接使用线程池, 只能先卡住事件循环
# with ThreadPoolExecutor(max_workers=4) as executor:
# futures = [executor.submit(foo, n) for n in ns]
# results = [future.result() for future in futures]
# aresults = await asyncio.gather(*atasks)
# results = results + aresults
print(f"results: {results}")
if __name__ == "__main__":
t1 = time.time()
asyncio.run(main())
t2 = time.time()
print(f"time: {t2 - t1}")
MISC (TODO)
TODO:
(1) 线程安全的准确定义: Linearizability
(2) event loop 是怎么被执行的, task 是怎么被注册的
(3) loop.call_soon 与 loop.call_soon_threadsafe 的区别
(4) 以 _UnixSelectorEventLoop
为例, 似乎关键在于 run_forever
或者说 _run_once
(在 BaseEventLoop
中实现)
TODO: 下面这段已经被重复了
在 Linux 下: asyncio.new_event_loop
实际上是 get_event_loop_policy().new_event_loop()
, 而 get_event_loop_policy()
实际上是 DefaultEventLoopPolicy()
, 而 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
, _UnixDefaultEventLoopPolicy
的类属性 _loop_factory=_UnixSelectorEventLoop
, 最终:
asyncio.new_event_loop()
# 等价于
_UnixSelectorEventLoop()
而 _UnixSelectorEventLoop
的继承关系是:
aynscio.events.AbstractEventLoop # 纯粹是抽象类, 全部都是 raise NotImplementedError 的抽象方法
aynscio.base_events.BaseEventLoop
aynscio.selector_events.BaseSelectorEventLoop
aynscio.unix_events._UnixSelectorEventLoop