动机、参考资料、涉及内容

阅读一些底层代码时 (huggingface_hb.hf_hub_download) 时涉及到一些读写文件等 io 操作, 因此做一些记录

  • os.pathpathlib
  • io 是什么
  • filelock 避免多进程同时写

FAQ

open

在 python3 中 io.openopen 是一样的, 而 os.open 是更底层的接口 (基本上就是系统调用)

import os
pid = os.getpid()
# os.O_RDONLY 是整数 0, fd 是文件描述符, 也是一个整数
fd = os.open("file.lock", os.O_RDONLY)
os.close(fd)  # 可以查看 /proc/{pid}/fd/{fd} 看到文件内容

f = open("file.lock", "r")
fd = f.fileno()

io

从 API 的角度, 已知的如下:

  • IOBase, RawIOBase, BufferedIOBase, TextIOBase: 这些都是 ABC Meta, 后三个继承自第一个
  • RawIOBase 的子类: FileIO
  • TextIOBase 的子类: TextIOWrapper, StringIO
  • BufferedIOBase 的子类: BufferdWriter, BufferedReader, BufferedRWPair, BufferedRandom, BytesIO
  • helper: IncrementalNewlineDecoder

其他:

  • io.DEFAULT_BUFFER_SIZE, io.open, io.open_code, io.text_encoding

io.opendocstring

This is an alias for the builtin open() function.

tempfile

tempfile 模块主要作用是建立临时文件或文件夹. 主要接口如下:

  • TemporaryFile, NamedTemporaryFile, TemporaryDirectory 是 high-level API, 一般用于 with 语句中, 在 with 语句块结束后, 会将创建的临时文件/文件夹删除
  • mkstempmkdtemp 是 low-level API, 只负责
import tempfile
import os

# 自动清理生成的临时文件
with tempfile.TemporaryFile() as f:
    print(type(f))   # _io.BufferedRandom
    print(f.name)    # 一个整数, 代表文件描述符
    f.write(b"123")
    f.seek(1)
    f.write(b"bcde")
    f.seek(0)
    print(f.read())   # b"1bcde"
    f.seek(1)
    f.write("23")
    f.seek(0)
    print(f.read())   # b"123de"

# 手动关闭(自动触发删除)
f = tempfile.TemporaryFile()  # _io.BufferedRandom
f.write(b'Hello world!')
# read data from file
f.seek(0)
f.read()
b'Hello world!'
# close the file, it will be removed
f.close()


with tempfile.NamedTemporaryFile() as f:
    print(type(f), f, f.name)
    time.sleep(20)

with tempfile.TemporaryDirectory() as dname:
    print("created temporary directory:", dname)  # dname 是字符串类型
    with open(os.path.join(dname, "a.txt"), "wt") as fw:
        fw.write("hello")

os

pathlib

shutil

filelock

使用两个终端同时跑下面的脚本时, 后启动的脚本会等先启动的脚本运行完毕后再执行 with lock 之后的语句, filelock 可以用来避免写文件的冲突.

import os
from filelock import FileLock
import time

lock = FileLock("file.lock")
n = 100
t = 1.0

with lock:
    with open("file.txt", "a") as f:
        print(f.fileno())
        for i in range(n):
            text = f"Line written by process { os.getpid()}"
            print(text)
            f.write(text+"\n")
            time.sleep(t)

psutil

文件描述符

一个进程可能会打开多个文件, 每个文件对应于一个整数, 这个整数被称为文件描述符, 在 Linux 的实现里, 这几个文件/目录的含义如下:

`/proc/{pid}/fd/{fd}` 文件, 可以使用 cat 命令查看, 即为被打开的文件内容

`/proc/{pid}/fdinfo/{fd}` 文件, 可以使用 cat 命令查看其内容, 文件内容样例如下, 其中 flags 代表打开方式(读/写), pos是文件指针位置
pos:    0
flags:  02500002
mnt_id: 38

源码实现细节

# psutil/_common.py
POSIX = os.name == "posix"
WINDOWS = os.name == "nt"
LINUX = sys.platform.startswith("linux")
MACOS = sys.platform.startswith("darwin")
OSX = MACOS  # deprecated alias
FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
OPENBSD = sys.platform.startswith("openbsd")
NETBSD = sys.platform.startswith("netbsd")
BSD = FREEBSD or OPENBSD or NETBSD
SUNOS = sys.platform.startswith(("sunos", "solaris"))
AIX = sys.platform.startswith("aix")


# psutil/__init__.py
from ._common import LINUX, WINDOWS
if LINUX:
    from . import _pslinux as _psplatform
elif WINDOWS:
    from . import _pswindows as _psplatform
...
class Process:
    def __init__(self, pid=None):
        ...
        self._proc = _psplatform.Process(pid)
        ...
    def open_files(self):
        return self._proc.open_files()

# psutil/_pslinux.py
# 可以粗略看到其实现实质上
class Process:
    ...

    @wrap_exceptions
    def open_files(self):
        retlist = []
        files = os.listdir("%s/%s/fd" % (self._procfs_path, self.pid))
        hit_enoent = False
        for fd in files:
            file = "%s/%s/fd/%s" % (self._procfs_path, self.pid, fd)
            try:
                path = readlink(file)
            except (FileNotFoundError, ProcessLookupError):
                # ENOENT == file which is gone in the meantime
                hit_enoent = True
                continue
            except OSError as err:
                if err.errno == errno.EINVAL:
                    # not a link
                    continue
                if err.errno == errno.ENAMETOOLONG:
                    # file name too long
                    debug(err)
                    continue
                raise
            else:
                # If path is not an absolute there's no way to tell
                # whether it's a regular file or not, so we skip it.
                # A regular file is always supposed to be have an
                # absolute path though.
                if path.startswith('/') and isfile_strict(path):
                    # Get file position and flags.
                    file = "%s/%s/fdinfo/%s" % (
                        self._procfs_path, self.pid, fd)
                    try:
                        with open_binary(file) as f:
                            pos = int(f.readline().split()[1])
                            flags = int(f.readline().split()[1], 8)
                    except (FileNotFoundError, ProcessLookupError):
                        # fd gone in the meantime; process may
                        # still be alive
                        hit_enoent = True
                    else:
                        mode = file_flags_to_mode(flags)
                        ntuple = popenfile(
                            path, int(fd), int(pos), mode, flags)
                        retlist.append(ntuple)
        if hit_enoent:
            self._assert_alive()
        return retlist
print(inspect.getsource(psutil._psplatform.Process.open_files))  # 查看实现
proc = psutil.Process(os.getpid())
proc.open_files()  # 查看文件描述符列表