(LTS) Python Snippets
Pytorch
计时
参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html
import torch
import time
device = "cuda"
x = torch.randn(5000, 5000)
# CPU version
start_time = time.time()
_ = torch.matmul(x, x)
end_time = time.time()
print(f"CPU time: {(end_time - start_time):6.5f}s")
# GPU version
if torch.cuda.is_available():
x = x.to(device)
# CUDA is asynchronous, so we need to use different timing functions
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
_ = torch.matmul(x, x)
end.record()
torch.cuda.synchronize() # Waits for everything to finish running on the GPU
print(f"GPU time: {0.001 * start.elapsed_time(end):6.5f}s") # Milliseconds to seconds
随机种子
参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html
# GPU operations have a separate seed we also want to set
if torch.cuda.is_available():
torch.cuda.manual_seed(42)
torch.cuda.manual_seed_all(42)
# Additionally, some operations on a GPU are implemented stochastic for efficiency
# We want to ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.determinstic = True
torch.backends.cudnn.benchmark = False
可视化
决策边界可视化
参考: https://pytorch-lightning.readthedocs.io/en/latest/notebooks/course_UvA-DL/01-introduction-to-pytorch.html
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import to_rgba
def pred_func(x):
# x: (H, W, 2)
# y: (H, W)
y = np.sum(x, axis=-1)
y = 1 / (np.exp(y - 1) + 1)
return y
x1 = np.arange(-0.5, 1.5, step=0.01)
x2 = np.arange(-0.5, 1.5, step=0.01)
xx1, xx2 = np.meshgrid(x1, x2)
model_inputs = np.stack([xx1, xx2], axis=-1)
preds = pred_func(model_inputs)[..., None]
c0 = to_rgba("C0")
c1 = to_rgba("C1")
output_image = (1 - preds) * c0 + preds * c1
plt.imshow(output_image, origin="lower", extent=(-0.5, 1.5, -0.5, 1.5))
plt.grid(False)
杂项
怎么判断 namedtuple
# torch==2.0.0: torch/utils/_pytree.py
# h/t https://stackoverflow.com/questions/2166818/how-to-check-if-an-object-is-an-instance-of-a-namedtuple
def _is_namedtuple_instance(pytree: Any) -> bool:
typ = type(pytree)
bases = typ.__bases__
if len(bases) != 1 or bases[0] != tuple:
return False
fields = getattr(typ, '_fields', None)
if not isinstance(fields, tuple):
return False
return all(type(entry) == str for entry in fields)
# 绕过检查的 bug !
class A(tuple):
_fields = ("a", "b")
def __init__(self):
super().__init__()
_is_namedtuple_instance(A()) # True
怎么找到一个可用的本机端口
来源: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/utils/system.py
import socket
def find_available_tcp_port() -> int:
"""Find an available TCP port, return -1 if none available."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
return port
except Exception:
return -1
finally:
sock.close()
实现单例模式
来源: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/core/config.py
from dataclasses import dataclass
class Singleton(type):
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
else:
instance = cls._instances[cls]
for key, value in kwargs.items():
setattr(instance, key, value)
return cls._instances[cls]
@dataclass
class AppConfig(metaclass=Singleton):
db_uri: str
db_max_connections: int
debug: bool = False
# 单例模式: app_config is app_config2: True
app_config = AppConfig(db_uri="mydatabaseuri", db_max_connections=10)
app_config2 = AppConfig(db_uri="shouldbeignored", db_max_connections=20)
使用 python 操作 docker
TODO: 也许可以移到 note/docker 里
相关的用法抽取自: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/docker/ssh_box.py
官方文档: https://docker-py.readthedocs.io/en/stable/
import docker
container_name = "milvus-standalone"
docker_client = docker.DockerClient()
container = docker_client.containers.get(container_name) # 获取一个已有的 docker-container, 如果没有, 则报错
# 停止并删除容器
container.stop()
container.remove()
try:
while container.status != 'exited':
time.sleep(1)
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
# 启动一个已经存在的容器
if container.status != 'running':
container.start()
while container.status != 'running':
time.sleep(1)
container = docker_client.containers.get(container_name)
container_image = "ghcr.io/opendevin/sandbox:main"
container_name = "opendevin-sandbox-test"
ssh_port = 63710
# docker run --name {container_name} --network host -d -p 63710:63710 -v --workdir /workspace -v /home/hostusername/workspace:/workspace -v /temp/cache:/home/opendevin/.cache {container_image} /usr/sbin/sshd -D -p 63710 -o 'PermitRootLogin=yes'
container = docker_client.containers.run(
container_image,
# allow root login
command=f"/usr/sbin/sshd -D -p {ssh_port} -o 'PermitRootLogin=yes'",
network_mode="host",
ports={f"{ssh_port}/tcp": ssh_port}, # {'2222/tcp': 3333} 2222 是内部端口, 3333 是主机端口
working_dir="/workspace", # 容器内的工作目录
name=container_name,
detach=True,
volumes={
"/home/hostusername/workspace": {
'bind': "/workspace",
'mode': 'rw'
},
"/temp/cache": {
'bind': '/home/opendevin/.cache',
'mode': 'rw',
},
}, # key 代表主机目录, bind 后面的代表容器内目录
)
exit_code, logs = container.exec_run(
['/bin/bash', '-c', "echo '$MYVARNAME'"],
workdir="/workdir",
environment={"MYVARNAME": "abc"}, # 设置此条命令运行时的环境变量
)
# 相当于 docker container ls --all
for container in docker_client.containers.list(all=True):
print(container.name)
# 专门提供了一个方法传输 tar 文件, 等价于以下的后面两条命令
# tar -cvf archive.tar /path/to/files
# docker cp archive.tar container_name:/path/to/destination
# docker exec container_name tar -xvf /path/to/destination/archive.tar -C /path/to/destination
import glob
files = glob(host_src + '/**/*', recursive=True)
srcname = os.path.basename(host_src)
tar_filename = os.path.join(tmp_dir, srcname + '.tar')
with tarfile.open(tar_filename, mode='w') as tar:
for file in files:
tar.add(
file,
arcname=os.path.relpath(file, os.path.dirname(host_src)),
)
with open(tar_filename, 'rb') as f:
data = f.read()
self.container.put_archive(os.path.dirname(sandbox_dest), data)
重试
from tenacity import retry, stop_after_attempt, wait_fixed, RetryError
@retry(
stop=stop_after_attempt(3), # 停止重试的策略:尝试3次后停止
wait=wait_fixed(2), # 等待策略:固定等待2秒
)
def test_func():
print("enter function")
raise ValueError("Something went wrong!")
import time
t1 = time.time()
try:
test_func()
except RetryError as e:
print(f"Function failed after retries: {e}")
except:
print("Other error occurs")
t2 = time.time()
print(t2 - t1)
重试: requests 库
import requests
from requests.adapters import HTTPAdapter
MAX_CONNECTION_NUM = 10000
session = requests.Session()
session.mount(
"http://",
HTTPAdapter(
max_retries=3,
pool_connections=MAX_CONNECTION_NUM,
pool_maxsize=MAX_CONNECTION_NUM,
)
)
session.mount(
"https://",
HTTPAdapter(
max_retries=3,
pool_connections=MAX_CONNECTION_NUM,
pool_maxsize=MAX_CONNECTION_NUM,
)
)
session.post("http://localhost:1234", json={"content": "xx"}, timeout=2)
python 脚本退出时调用的函数
import atexit
class TextProcess:
def __init__(self, filename):
self.fr = open(filename)
def reset(self):
self.fr.seek(0)
def echo(self):
print(self.fr.read())
self.reset()
def close(self):
print("exit")
self.fr.close()
processor = TextProcess("a.txt")
atexit.register(processor.close)
processor.echo()
atexit
是标准库, 一般只会用到 register
和 unregister
函数
ssh 的 python 接口
官方文档: https://pexpect.readthedocs.io/en/stable/api/pxssh.html#
一个实际的例子参考: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/docker/ssh_box.py
from pexpect import pxssh
hostname = "localhost"
username = 'root'
port = 21345
password = "abcdefgh"
try:
ssh = pxssh.pxssh(
echo=False,
timeout=2,
encoding='utf-8',
codec_errors='replace',
)
# ssh_cmd = f'`ssh -v -p {port} {username}@{hostname}`'
ssh.login(hostname, username, password, port=port)
# Fix: https://github.com/pexpect/pexpect/issues/669
ssh.sendline("bind 'set enable-bracketed-paste off'")
ssh.prompt()
ssh.sendline('uptime') # run a command
ssh.prompt() # match the prompt
print(ssh.before) # print everything before the prompt.
ssh.sendline('ls -l')
ssh.prompt()
print(ssh.before)
ssh.sendline('df')
ssh.prompt()
print(ssh.before)
ssh.logout()
except pxssh.ExceptionPxssh as e:
print("pxssh failed on login.")
print(e)
其实基本上就是反复进行 ssh.sendline(cmd)
, ssh.prompt()
, ssh.before
的循环, 以执行一条命令并拿到结果.
一个不太能理解的例子
当 ssh 设置超时的时候, 可能会让上述 ``ssh.sendline(cmd),
ssh.prompt(),
ssh.before` 循环出于不稳定状态.
import time
from pexpect import pxssh
ssh = pxssh.pxssh(
echo=False,
timeout=2,
encoding='utf-8',
codec_errors='replace',
)
hostname = "localhost"
username = 'root'
port = 21345
password = "abcdefgh"
# ssh_cmd = f'`ssh -v -p {port} {username}@{hostname}`'
ssh.login(hostname, username, password, port=port)
ssh.sendline("bind 'set enable-bracketed-paste off'")
ssh.prompt()
print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))
print("="*40)
num_chars = ssh.sendline("sleep 6")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))
print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))
time.sleep(12)
print("="*40)
num_chars = ssh.sendline("ls -al /root")
succeed = ssh.prompt()
s = ssh.before # 这个 s 为什么没有输出
print(num_chars, s, succeed, type(num_chars), type(s), type(succeed))
输出
========================================
13 total 24
drwx------ 1 root root 4096 Jun 14 06:51 .
drwxr-xr-x 1 root root 4096 Jun 14 06:29 ..
-rw------- 1 root root 1386 Jun 14 07:31 .bash_history
-rw-r--r-- 1 root root 3106 Oct 15 2021 .bashrc
drwx------ 2 root root 4096 Jun 14 06:29 .cache
-rw-r--r-- 1 root root 161 Jul 9 2019 .profile
True <class 'int'> <class 'str'> <class 'bool'>
========================================
8 False <class 'int'> <class 'str'> <class 'bool'>
========================================
13 False <class 'int'> <class 'str'> <class 'bool'>
========================================
13 True <class 'int'> <class 'str'> <class 'bool'>
一个可能的解法, 但有些长, 可以参考: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/sand_box.py 中的 DockerSSHBox.execute
方法
Mixin 类怎么通过 mypy 的静态检查: typing.Protocol
完全参考自: https://stackoverflow.com/questions/51930339/how-do-i-correctly-add-type-hints-to-mixin-classes
一个实例: https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/sand_box.py + https://github.com/OpenDevin/OpenDevin/blob/main/opendevin/runtime/plugins/mixin.py
不能通过 mypy 静态检查的代码
class MultiplicatorMixin:
def multiply(self, m: int) -> int:
return self.value * m
class AdditionMixin:
def add(self, b: int) -> int:
return self.value + b
class MyClass(MultiplicatorMixin, AdditionMixin):
def __init__(self, value: int) -> None:
self.value = value
instance = MyClass(10)
print(instance.add(2))
print(instance.multiply(2))
如果对上面的代码使用
mypy test_mypy.py
进行代码检查, 会出现报错信息:
test_mypy.py:3: error: "MultiplicatorMixin" has no attribute "value" [attr-defined]
test_mypy.py:8: error: "AdditionMixin" has no attribute "value" [attr-defined]
Found 2 errors in 1 file (checked 1 source file)
推荐使用下面的方式:
from typing import Protocol
class HasValueProtocol(Protocol):
@property
def value(self) -> int: ...
class MultiplicatorMixin:
# 注意: 这里要对 self 加上 type hint
def multiply(self: HasValueProtocol, m: int) -> int:
return self.value * m
class AdditionMixin:
# 注意: 这里要对 self 加上 type hint
def add(self: HasValueProtocol, b: int) -> int:
return self.value + b
class MyClass(MultiplicatorMixin, AdditionMixin):
def __init__(self, value: int) -> None:
self.value = value
instance = MyClass(10)
print(instance.add(2))
print(instance.multiply(2))