(Refactor) FastAPI tutorial
动机、参考资料、涉及内容
将 fastapi 独立抽出来做记录, TODO: 重构
FastAPI
- path parameter: 可以使用
fastapi.Path
配合typing.Annotated
来做 type hint 以及数据校验 - query parameter: 可以使用
fastapi.Query
配合typing.Annotated
来做 type hint 以及数据校验 - body: 可以使用
pydantic.BaseModel
来定义, 而BaseModel
内可以用pydantic.Field
配合typing.Annotated
来做 type hint 以及数据校验, 除了可以使用baseModel
外, 也可以使用fastapi.Body
pydantic.Field
实际上是函数, 返回的是 pydantic.FieldInfo
类型, 而 fastapi 中也类似: fastapi.{Query,Path,Body}
都是函数, 返回的是 pydantic.FieldInfo
的子类型. 继承关系如下:
[fastapi.Query(), fastapi.Path(), fastapi.Body()] -> fastapi.Param -> pydantic.FieldInfo
示例 1: FastAPI 各种 type hint 的写法
以下是一个例子, 展示了 FastAPI 的各种用法, 以及相应的 requests 调用方式:
fastapi 服务端代码
# server.py
from fastapi import FastAPI
from fastapi import UploadFile # 用于 type hint, 用法与 typing.List, typing.Dict 类似
from fastapi import File, Form # 用法与 Path, Query, Body 类似
from fastapi import Path, Query, Body # 分别对应 HTTP 协议的路径,请求参数,请求体
from pydantic import Field, BaseModel
from typing import List, Annotated, Union
import hashlib
app = FastAPI()
class InputModel(BaseModel):
a: str
b: str = Field(default="abc", title="bbb")
# 1. path parameter and query parameter
# http://<host>:<port>/items/a/b/?q=asd
# p 是路径参数 (path parameter), q 是查询参数 (query parameter)
# {"p": 123, "q": "asd"}, 注意这里限制了 p 是整数, 因此会自动进行校验并转换
@app.get("/items/{p:path}")
async def read_items(
p: Annotated[int, Path(title="The ID of the item to get")],
desc: Union[str, None] = Query(max_length=50),
):
return {
"p": p,
"desc": desc,
}
# 2.1 单文件上传, 注意上传文件时, 额外参数只能使用 Form 或 Body, 但不能用 BaseModel
@app.post("/upload/")
async def create_upload_file(
file: UploadFile = File(...),
a: str = Form(...),
b: str = Body(...)
):
name = file.filename
file_content = await file.read()
sha256_hash = hashlib.sha256(file_content).hexdigest()
return {"hash": sha256_hash, "name": name}
# 2.2 多文件上传, 注意上传文件时, 额外参数只能使用 Form 或 Body, 但不能用 BaseModel
@app.post("/uploads/")
async def create_upload_file(
file: List[UploadFile],
a: str = Body(default=None), # Body 和 Form 均可使用, 但推荐用 Form. 但注意不能用 BaseModel
b: str = Form(default=None)
):
print(a, b)
if isinstance(file, list):
print(len(file))
file = file[0]
name = file.filename
file_content = await file.read()
sha256_hash = hashlib.sha256(file_content).hexdigest()
return {"hash": sha256_hash, "name": name}
@app.post("/form/")
def post_form(
a: str = Form(...),
b: str = Form(...)
):
print(a, b)
return {"aa": a, "bb": b}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
requests 客户端代码
# client.py
import requests
import json
files = [('file', open("a.txt", 'rb'))]
data = {"a": "1", "b": 2}
response = requests.get(
'http://127.0.0.1:8000/items/123',
params={"desc": "abc"}
)
response = requests.post(
'http://127.0.0.1:8000/upload/',
files=files,
# 注意此处使用 json=data 请求数据的话会被忽略, 导致服务端接收的 a, b 是默认值, 因此不能使用
# 注意此处不能使用 data=json.dumps(data), 会直接报错
data=data,
)
response = requests.post(
'http://127.0.0.1:8000/uploads/',
files=files,
# 注意此处使用 json=data 请求数据的话会被忽略, 导致服务端接收的 a, b 是默认值, 因此不能使用
# 注意此处不能使用 data=json.dumps(data), 会直接报错
data=data,
)
response = requests.post(
"http://127.0.0.1:8000/form/",
# 注意此处不能用 data=json.dumps(data) 和 json=data
data=data,
)
示例 2: 客户端中断连接, 服务端继续执行
在以下代码中, 如果客户端使用 CTRL+C 中断调用, 服务端代码会继续执行. 也就是说假设第一次调用使用 CTRL+C 中止 client 代码, 然后再正常运行 client 代码, 这时全局变量 a
将变成 201 返回.
服务端
from fastapi import FastAPI
import uvicorn
import time
a = 1
app = FastAPI()
@app.get("/update")
def update():
global a
for i in range(100):
time.sleep(0.1)
a += 1
return {"a": a}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
客户端
import requests
resp = requests.get("http://127.0.0.1:8000/update")
print(resp.json())
示例 3: 返回文件
服务端
from fastapi import FastAPI
from fastapi.responses import FileResponse
import uvicorn
app = FastAPI()
@app.get("/download")
async def download_file():
file_path = "simple_fx.py" # 替换为你要下载的文件路径
return FileResponse(file_path, filename="save.py")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
客户端: requests
import requests
response = requests.get("http://127.0.0.1:8000/download")
content_disposition = response.headers.get('Content-Disposition')
filename = content_disposition.split('filename=')[1].replace('"', '')
with open(filename, "wb") as fw:
fw.write(response.content)
客户端: streamlit
import streamlit as st
import requests
response = requests.get("http://127.0.0.1:8000/download")
content_disposition = response.headers.get('Content-Disposition')
filename = content_disposition.split('filename=')[1].replace('"', '')
# with open(filename, "wb") as fw:
# fw.write(response.content)
st.download_button("下载", response.content, filename, use_container_width=True)
示例 4: CORS
客户端: html+js 网页
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>This is Title</title>
</head>
<body>
<h1>My JavaScript Code</h1>
<p id="demo">
JavaScript can change element
</p>
<button type="button" onclick="myFunction()">click here</button>
</body>
<script>
function myFunction()
{
fetch('http://localhost:8000/api/data')
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json();
})
.then(data => {
document.getElementById("demo").innerHTML = data.text;
})
.catch(error => {
console.error('Error:', error);
document.getElementById("demo").innerHTML = "发生错误,请查看控制台";
});
}
</script>
</html>
服务端: FastAPI
# serve.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI()
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源,出于安全考虑,应该指定具体来源
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头部
)
@app.get("/api/data")
def api_data():
return {"text": "hello"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
首先使用 python serve.py
启动服务, 然后直接用浏览器打开网页, 然后点击按钮即可看到元素被改变.
实例 5: WebSocket
可以参考官网示例, 那里的客户端是一个网页, 这里是另一个示例
在下面这个例子中, 在客户端与服务端之间有个奇怪的约定是: 每次客户端发送数据后, 服务端会发送两次数据.
服务端
# server.py
from fastapi import FastAPI
from fastapi import WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message from server 1: {data}")
await websocket.send_text(f"Message from server 2: {data}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
客户端
import asyncio
import websockets
import os
os.environ["HTTP_PROXY"]=""
os.environ["HTTPS_PROXY"]=""
async def hello():
uri = "ws://127.0.0.1:8000/ws"
async with websockets.connect(uri) as websocket:
print("send data")
await websocket.send("1+1") # 注意这个 websocket 对象只能发送字符串
response = await websocket.recv()
print(response)
response = await websocket.recv()
print(response)
asyncio.get_event_loop().run_until_complete(hello())
受这个启发, 我们可以想象更多地较为实际的约定: 例如, 客户端可以多次发送, 直至约定的特殊标记: {"status": "end"}
, 服务端也可以同样多次发送, 直至特殊标记: {"status": "end"}
. 这样一来客户端可以先为用户展示一些数据, 然后再从服务端继续获取, 再展示下一部分, 直至结束.
实例 6: Streaming Response (TODO)
fastapi.responses.StreamingResponse
vs sse_starlette.sse.EventSourceResponse