(P1) Python 文件与 IO
动机、参考资料、涉及内容
阅读一些底层代码时 (huggingface_hb.hf_hub_download
) 时涉及到一些读写文件等 io 操作, 因此做一些记录
os.path
与pathlib
- io 是什么
- filelock 避免多进程同时写
FAQ
open
在 python3 中 io.open
与 open
是一样的, 而 os.open
是更底层的接口 (基本上就是系统调用)
import os
pid = os.getpid()
# os.O_RDONLY 是整数 0, fd 是文件描述符, 也是一个整数
fd = os.open("file.lock", os.O_RDONLY)
os.close(fd) # 可以查看 /proc/{pid}/fd/{fd} 看到文件内容
f = open("file.lock", "r")
fd = f.fileno()
io
从 API 的角度, 已知的如下:
IOBase
,RawIOBase
,BufferedIOBase
,TextIOBase
: 这些都是 ABC Meta, 后三个继承自第一个RawIOBase
的子类:FileIO
TextIOBase
的子类:TextIOWrapper
,StringIO
BufferedIOBase
的子类:BufferdWriter
,BufferedReader
,BufferedRWPair
,BufferedRandom
,BytesIO
- helper:
IncrementalNewlineDecoder
其他:
io.DEFAULT_BUFFER_SIZE
,io.open
,io.open_code
,io.text_encoding
io.open
的 docstring
This is an alias for the builtin open() function.
tempfile
tempfile 模块主要作用是建立临时文件或文件夹. 主要接口如下:
TemporaryFile
,NamedTemporaryFile
,TemporaryDirectory
是 high-level API, 一般用于 with 语句中, 在 with 语句块结束后, 会将创建的临时文件/文件夹删除mkstemp
与mkdtemp
是 low-level API, 只负责
import tempfile
import os
# 自动清理生成的临时文件
with tempfile.TemporaryFile() as f:
print(type(f)) # _io.BufferedRandom
print(f.name) # 一个整数, 代表文件描述符
f.write(b"123")
f.seek(1)
f.write(b"bcde")
f.seek(0)
print(f.read()) # b"1bcde"
f.seek(1)
f.write("23")
f.seek(0)
print(f.read()) # b"123de"
# 手动关闭(自动触发删除)
f = tempfile.TemporaryFile() # _io.BufferedRandom
f.write(b'Hello world!')
# read data from file
f.seek(0)
f.read()
b'Hello world!'
# close the file, it will be removed
f.close()
with tempfile.NamedTemporaryFile() as f:
print(type(f), f, f.name)
time.sleep(20)
with tempfile.TemporaryDirectory() as dname:
print("created temporary directory:", dname) # dname 是字符串类型
with open(os.path.join(dname, "a.txt"), "wt") as fw:
fw.write("hello")
os
pathlib
shutil
filelock
使用两个终端同时跑下面的脚本时, 后启动的脚本会等先启动的脚本运行完毕后再执行 with lock
之后的语句, filelock 可以用来避免写文件的冲突.
import os
from filelock import FileLock
import time
lock = FileLock("file.lock")
n = 100
t = 1.0
with lock:
with open("file.txt", "a") as f:
print(f.fileno())
for i in range(n):
text = f"Line written by process { os.getpid()}"
print(text)
f.write(text+"\n")
time.sleep(t)
psutil
文件描述符
一个进程可能会打开多个文件, 每个文件对应于一个整数, 这个整数被称为文件描述符, 在 Linux 的实现里, 这几个文件/目录的含义如下:
`/proc/{pid}/fd/{fd}` 文件, 可以使用 cat 命令查看, 即为被打开的文件内容
`/proc/{pid}/fdinfo/{fd}` 文件, 可以使用 cat 命令查看其内容, 文件内容样例如下, 其中 flags 代表打开方式(读/写), pos是文件指针位置
pos: 0
flags: 02500002
mnt_id: 38
源码实现细节
# psutil/_common.py
POSIX = os.name == "posix"
WINDOWS = os.name == "nt"
LINUX = sys.platform.startswith("linux")
MACOS = sys.platform.startswith("darwin")
OSX = MACOS # deprecated alias
FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
OPENBSD = sys.platform.startswith("openbsd")
NETBSD = sys.platform.startswith("netbsd")
BSD = FREEBSD or OPENBSD or NETBSD
SUNOS = sys.platform.startswith(("sunos", "solaris"))
AIX = sys.platform.startswith("aix")
# psutil/__init__.py
from ._common import LINUX, WINDOWS
if LINUX:
from . import _pslinux as _psplatform
elif WINDOWS:
from . import _pswindows as _psplatform
...
class Process:
def __init__(self, pid=None):
...
self._proc = _psplatform.Process(pid)
...
def open_files(self):
return self._proc.open_files()
# psutil/_pslinux.py
# 可以粗略看到其实现实质上
class Process:
...
@wrap_exceptions
def open_files(self):
retlist = []
files = os.listdir("%s/%s/fd" % (self._procfs_path, self.pid))
hit_enoent = False
for fd in files:
file = "%s/%s/fd/%s" % (self._procfs_path, self.pid, fd)
try:
path = readlink(file)
except (FileNotFoundError, ProcessLookupError):
# ENOENT == file which is gone in the meantime
hit_enoent = True
continue
except OSError as err:
if err.errno == errno.EINVAL:
# not a link
continue
if err.errno == errno.ENAMETOOLONG:
# file name too long
debug(err)
continue
raise
else:
# If path is not an absolute there's no way to tell
# whether it's a regular file or not, so we skip it.
# A regular file is always supposed to be have an
# absolute path though.
if path.startswith('/') and isfile_strict(path):
# Get file position and flags.
file = "%s/%s/fdinfo/%s" % (
self._procfs_path, self.pid, fd)
try:
with open_binary(file) as f:
pos = int(f.readline().split()[1])
flags = int(f.readline().split()[1], 8)
except (FileNotFoundError, ProcessLookupError):
# fd gone in the meantime; process may
# still be alive
hit_enoent = True
else:
mode = file_flags_to_mode(flags)
ntuple = popenfile(
path, int(fd), int(pos), mode, flags)
retlist.append(ntuple)
if hit_enoent:
self._assert_alive()
return retlist
print(inspect.getsource(psutil._psplatform.Process.open_files)) # 查看实现
proc = psutil.Process(os.getpid())
proc.open_files() # 查看文件描述符列表