Pytorch

计时

参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html

import torch
import time
device = "cuda"
x = torch.randn(5000, 5000)

# CPU version
start_time = time.time()
_ = torch.matmul(x, x)
end_time = time.time()
print(f"CPU time: {(end_time - start_time):6.5f}s")

# GPU version
if torch.cuda.is_available():
    x = x.to(device)
    # CUDA is asynchronous, so we need to use different timing functions
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)
    start.record()
    _ = torch.matmul(x, x)
    end.record()
    torch.cuda.synchronize()  # Waits for everything to finish running on the GPU
    print(f"GPU time: {0.001 * start.elapsed_time(end):6.5f}s")  # Milliseconds to seconds

随机种子

参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html

# GPU operations have a separate seed we also want to set
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)

# Additionally, some operations on a GPU are implemented stochastic for efficiency
# We want to ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.determinstic = True
torch.backends.cudnn.benchmark = False

可视化

决策边界可视化

参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import to_rgba

def pred_func(x):
    # x: (H, W, 2)
    # y: (H, W)
    y = np.sum(x, axis=-1)
    y = 1 / (np.exp(y - 1) + 1)
    return y

x1 = np.arange(-0.5, 1.5, step=0.01)
x2 = np.arange(-0.5, 1.5, step=0.01)
xx1, xx2 = np.meshgrid(x1, x2)
model_inputs = np.stack([xx1, xx2], axis=-1)

preds = pred_func(model_inputs)[..., None]

c0 = to_rgba("C0")
c1 = to_rgba("C1")
output_image = (1 - preds) * c0 + preds * c1
plt.imshow(output_image, origin="lower", extent=(-0.5, 1.5, -0.5, 1.5))
plt.grid(False)

杂项

怎么判断 namedtuple

# torch==2.0.0: torch/utils/_pytree.py
# h/t https://stackoverflow.com/questions/2166818/how-to-check-if-an-object-is-an-instance-of-a-namedtuple
def _is_namedtuple_instance(pytree: Any) -> bool:
    typ = type(pytree)
    bases = typ.__bases__
    if len(bases) != 1 or bases[0] != tuple:
        return False
    fields = getattr(typ, '_fields', None)
    if not isinstance(fields, tuple):
        return False
    return all(type(entry) == str for entry in fields)

# 绕过检查的 bug !
class A(tuple):
    _fields = ("a", "b")
    def __init__(self):
        super().__init__()
_is_namedtuple_instance(A())  # True

怎么找到一个可用的本机端口

来源: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/utils/system.py

import socket

def find_available_tcp_port() -> int:
    """Find an available TCP port, return -1 if none available."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.bind(('localhost', 0))
        port = sock.getsockname()[1]
        return port
    except Exception:
        return -1
    finally:
        sock.close()

实现单例模式

来源: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/core/config.py

from dataclasses import dataclass


class Singleton(type):
    _instances: dict = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        else:
            instance = cls._instances[cls]
            for key, value in kwargs.items():
                setattr(instance, key, value)
        return cls._instances[cls]


@dataclass
class AppConfig(metaclass=Singleton):
    db_uri: str
    db_max_connections: int
    debug: bool = False

# 单例模式: app_config is app_config2: True
app_config = AppConfig(db_uri="mydatabaseuri", db_max_connections=10)
app_config2 = AppConfig(db_uri="shouldbeignored", db_max_connections=20)

使用 python 操作 docker

TODO: 也许可以移到 note/docker 里

相关的用法抽取自: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/docker/ssh_box.py

官方文档: https://docker-py.readthedocs.io/en/stable/

import docker
container_name = "milvus-standalone"
docker_client = docker.DockerClient()
container = docker_client.containers.get(container_name)  # 获取一个已有的 docker-container, 如果没有, 则报错

# 停止并删除容器
container.stop()
container.remove()
try:
    while container.status != 'exited':
        time.sleep(1)
        container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
    pass

# 启动一个已经存在的容器
if container.status != 'running':
    container.start()
while container.status != 'running':
    time.sleep(1)
    container = docker_client.containers.get(container_name)


container_image = "ghcr.io/opendevin/sandbox:main"
container_name = "opendevin-sandbox-test"
ssh_port = 63710

# docker run --name {container_name} --network host -d -p 63710:63710 -v --workdir /workspace -v /home/hostusername/workspace:/workspace -v /temp/cache:/home/opendevin/.cache {container_image} /usr/sbin/sshd -D -p 63710 -o 'PermitRootLogin=yes'
container = docker_client.containers.run(
    container_image,
    # allow root login
    command=f"/usr/sbin/sshd -D -p {ssh_port} -o 'PermitRootLogin=yes'",
    network_mode="host",
    ports={f"{ssh_port}/tcp": ssh_port},  # {'2222/tcp': 3333} 2222 是内部端口, 3333 是主机端口
    working_dir="/workspace",  # 容器内的工作目录
    name=container_name,
    detach=True,
    volumes={
        "/home/hostusername/workspace": {
            'bind': "/workspace",
            'mode': 'rw'
        },
        "/temp/cache": {
            'bind': '/home/opendevin/.cache',
            'mode': 'rw',
        },
    },  # key 代表主机目录, bind 后面的代表容器内目录
)

exit_code, logs = container.exec_run(
    ['/bin/bash', '-c', "echo '$MYVARNAME'"],
    workdir="/workdir",
    environment={"MYVARNAME": "abc"},  # 设置此条命令运行时的环境变量
)

# 相当于 docker container ls --all
for container in docker_client.containers.list(all=True):
    print(container.name)


# 专门提供了一个方法传输 tar 文件, 等价于以下的后面两条命令
# tar -cvf archive.tar /path/to/files
# docker cp archive.tar container_name:/path/to/destination
# docker exec container_name tar -xvf /path/to/destination/archive.tar -C /path/to/destination
import glob

files = glob(host_src + '/**/*', recursive=True)
srcname = os.path.basename(host_src)
tar_filename = os.path.join(tmp_dir, srcname + '.tar')
with tarfile.open(tar_filename, mode='w') as tar:
    for file in files:
        tar.add(
            file,
            arcname=os.path.relpath(file, os.path.dirname(host_src)),
        )
with open(tar_filename, 'rb') as f:
    data = f.read()
self.container.put_archive(os.path.dirname(sandbox_dest), data)  

重试

from tenacity import retry, stop_after_attempt, wait_fixed, RetryError

@retry(
    stop=stop_after_attempt(3),  # 停止重试的策略:尝试3次后停止
    wait=wait_fixed(2),         # 等待策略:固定等待2秒
)
def test_func():
    print("enter function")
    raise ValueError("Something went wrong!")

import time
t1 = time.time()
try:
    test_func()
except RetryError as e:
    print(f"Function failed after retries: {e}")
except:
    print("Other error occurs")
t2 = time.time()
print(t2 - t1)

重试: requests 库

import requests
from requests.adapters import HTTPAdapter

MAX_CONNECTION_NUM = 10000
session = requests.Session()

session.mount(
    "http://",
    HTTPAdapter(
        max_retries=3,
        pool_connections=MAX_CONNECTION_NUM,
        pool_maxsize=MAX_CONNECTION_NUM,
    )
)
session.mount(
    "https://",
    HTTPAdapter(
        max_retries=3,
        pool_connections=MAX_CONNECTION_NUM,
        pool_maxsize=MAX_CONNECTION_NUM,
    )
)

session.post("http://localhost:1234", json={"content": "xx"}, timeout=2)

python 脚本退出时调用的函数

import atexit

class TextProcess:
    def __init__(self, filename):
        self.fr = open(filename)

    def reset(self):
        self.fr.seek(0)

    def echo(self):
        print(self.fr.read())
        self.reset()
    
    def close(self):
        print("exit")
        self.fr.close()

processor = TextProcess("a.txt")
atexit.register(processor.close)
processor.echo()

atexit 是标准库, 一般只会用到 registerunregister 函数

ssh 的 python 接口

官方文档: https://pexpect.readthedocs.io/en/stable/api/pxssh.html#

一个实际的例子参考: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/docker/ssh_box.py

from pexpect import pxssh

hostname = "localhost"
username = 'root'
port = 21345
password = "abcdefgh"


try:
    ssh = pxssh.pxssh(
        echo=False,
        timeout=2,
        encoding='utf-8',
        codec_errors='replace',
    )
    # ssh_cmd = f'`ssh -v -p {port} {username}@{hostname}`'
    ssh.login(hostname, username, password, port=port)
    
    # Fix: https://github.com/pexpect/pexpect/issues/669
    ssh.sendline("bind 'set enable-bracketed-paste off'")
    ssh.prompt()
    
    ssh.sendline('uptime')   # run a command
    ssh.prompt()             # match the prompt
    print(ssh.before)        # print everything before the prompt.
    ssh.sendline('ls -l')
    ssh.prompt()
    print(ssh.before)
    ssh.sendline('df')
    ssh.prompt()
    print(ssh.before)
    ssh.logout()
except pxssh.ExceptionPxssh as e:
    print("pxssh failed on login.")
    print(e)

其实基本上就是反复进行 ssh.sendline(cmd), ssh.prompt(), ssh.before 的循环, 以执行一条命令并拿到结果.

一个不太能理解的例子

当 ssh 设置超时的时候, 可能会让上述 ``ssh.sendline(cmd), ssh.prompt(), ssh.before` 循环出于不稳定状态.

import time
from pexpect import pxssh

ssh = pxssh.pxssh(
    echo=False,
    timeout=2,
    encoding='utf-8',
    codec_errors='replace',
)
hostname = "localhost"
username = 'root'
port = 21345
password = "abcdefgh"

# ssh_cmd = f'`ssh -v -p {port} {username}@{hostname}`'

ssh.login(hostname, username, password, port=port)

ssh.sendline("bind 'set enable-bracketed-paste off'")
ssh.prompt()

print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))

print("="*40)
num_chars = ssh.sendline("sleep 6")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))

print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))

time.sleep(12)
print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before  # 这个 s 为什么没有输出
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))

输出

========================================
13 total 24
drwx------ 1 root root 4096 Jun 14 06:51 .
drwxr-xr-x 1 root root 4096 Jun 14 06:29 ..
-rw------- 1 root root 1386 Jun 14 07:31 .bash_history
-rw-r--r-- 1 root root 3106 Oct 15  2021 .bashrc
drwx------ 2 root root 4096 Jun 14 06:29 .cache
-rw-r--r-- 1 root root  161 Jul  9  2019 .profile
 True <class 'int'> <class 'str'> <class 'bool'>
========================================
8  False <class 'int'> <class 'str'> <class 'bool'>
========================================
13  False <class 'int'> <class 'str'> <class 'bool'>
========================================
13  True <class 'int'> <class 'str'> <class 'bool'>

一个可能的解法, 但有些长, 可以参考: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/sand_box.py 中的 DockerSSHBox.execute 方法

Mixin 类怎么通过 mypy 的静态检查: typing.Protocol

完全参考自: https://stackoverflow.com/questions/51930339/how-do-i-correctly-add-type-hints-to-mixin-classes

一个实例: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/sand_box.py + https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/plugins/mixin.py

不能通过 mypy 静态检查的代码

class MultiplicatorMixin:
    def multiply(self, m: int) -> int:
        return self.value * m

class AdditionMixin:
    def add(self, b: int) -> int:
        return self.value + b

class MyClass(MultiplicatorMixin, AdditionMixin):
    def __init__(self, value: int) -> None:
        self.value = value


instance = MyClass(10)
print(instance.add(2))
print(instance.multiply(2))

如果对上面的代码使用

mypy test_mypy.py

进行代码检查, 会出现报错信息:

test_mypy.py:3: error: "MultiplicatorMixin" has no attribute "value"  [attr-defined]
test_mypy.py:8: error: "AdditionMixin" has no attribute "value"  [attr-defined]
Found 2 errors in 1 file (checked 1 source file)

推荐使用下面的方式:

from typing import Protocol

class HasValueProtocol(Protocol):
    @property
    def value(self) -> int: ...

class MultiplicatorMixin:
    # 注意: 这里要对 self 加上 type hint
    def multiply(self: HasValueProtocol, m: int) -> int:
        return self.value * m

class AdditionMixin:
    # 注意: 这里要对 self 加上 type hint
    def add(self: HasValueProtocol, b: int) -> int:
        return self.value + b


class MyClass(MultiplicatorMixin, AdditionMixin):
    def __init__(self, value: int) -> None:
        self.value = value


instance = MyClass(10)
print(instance.add(2))
print(instance.multiply(2))