动机、参考资料、涉及内容

动机

web application? web server? web framework

WSGI 协议? WSGI 框架 (例如 Flask), WSGI 服务器 (例如 gunicorn)

ASGI 协议? ASGI 框架 (例如 FastAPI), ASGI 服务器 (例如 uvicorn)

而 Flask 又提到了 werkzeug, FastAPI 又提到了 starlette (fastapi 基于此?)

以上这些概念到底是什么关系? 跟 socket 又有什么关系? 平时使用 flask + gunicorn 或者 fastapi + uvicorn, 底层发生了什么?

参考资料

官方资料:

一些博客:

涉及内容

socket

不涉及内容

flask, fastapi 的详尽 API 介绍, 只涉及基本的 API (用于对标自定义实现)

socket (TODO: 待确认, 大部分代码来自 ChatGPT)

socket 应该说是所有的 server 和 client 的底层, 从网络协议的角度来说, 自底向上主要有这几层协议 (以下举的具体协议例子是配套的)

  • 数据链路层: 属于硬件层协议, 例如以太网 (Ethernet) 协议
  • 网络层协议: 例如: IP 协议
  • 传输层协议: 例如: TCP, UDP 协议
  • 应用层协议: 例如: HTTP, websocket 协议

python 的内置模块 socket 实现了网络层与传输层的许多协议, 特别是 TCP 与 IP 协议

而 python 中的 WSGI, ASGI 协议是这样的: 在 python 网络编程中, 通常会有 3 个实体, 客户端, web server, web application. 对应于使用 Flask 技术栈写的程序来说, 对应关系如下:

  • web application: 基于 Flask 框架写的 python 代码, 即:
    from flask import Flask
    app = Flask(__name__)
    @app.route("/", methods=["GET"])
    def foo():
      return {"hello": "world"}
    
  • web server: gunicorn
  • client: 浏览器, 或者使用 requests 包写的发请求的客户端程序
web application <- WSGI/ASGI 协议 -> web server <- HTTP 协议 -> client

注意: Flask 是 web application framework (或者简称 web framework), 而使用它写的服务端代码才是 web application

有了这些概念之后, 下面看一下例子:

socket server & socket client example

server 与 client 都使用 socket 直接实现, 并且 server 与 client 没有遵循常见的应用层 HTTP 协议.

server

# socket_server.py
import socket

# 创建一个套接字对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
# host = socket.gethostname()
host = "127.0.0.1"
port = 12345

# 绑定地址与端口
server_socket.bind((host, port))

# 设置最大连接数
server_socket.listen(5)

print("等待客户端连接...")

while True:
    # 建立客户端连接
    client_socket, addr = server_socket.accept()
    
    print("连接地址: ", addr)

    # 发送消息给客户端
    response = 'Hello, client! This is the server response.'
    client_socket.send(response.encode('utf-8'))

    # 关闭连接
    client_socket.close()

client

# socket_client.py
import socket

# 创建一个套接字对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
# host = socket.gethostname()
host = "127.0.0.1"
port = 12345

# 连接服务端
client_socket.connect((host, port))

# 获取本地端口号
local_port = client_socket.getsockname()[1]
print("客户端本地端口号:", local_port)

# 接收服务端的消息
message = client_socket.recv(1024)

print("来自服务器的消息: ", message.decode('utf-8'))

# 关闭连接
client_socket.close()

运行

# 第一个终端
python socket_server.py
# 第二个终端
python socket_client.py

注意: 在上面的实现里, 如果启动 socket_server.py 后, 使用浏览器访问 http://127.0.0.1:12345, 浏览器会无法返回结果, 但启动 socket_server.py 的终端将打印出 连接地址: xxx 的信息. 这是因为在上面的实现里, 浏览器发出 GET 请求后, 其内容符合 HTTP 请求格式, 而 socket_server.py 可以接受任意格式的数据, 但其返回的数据不遵循 HTTP 相应数据的格式, 因此浏览器无法解读相应内容, 所以会无法返回结果.

小结

服务端与客户端都需要建立一个 socket 对象 (socket.socket), 并且都会占据一个端口, 服务端的端口一般是固定的, 而客户端的端口一般是在建立连接 connect 时自动分配的, 流程如下:

socket   # server / client 建立 socket 对象
bind     # server
listen   # server
connect  # client 发起
accept   # server 端接受 connect
send     # server / client 都可以使用 send 方法, server 端一般是 send 响应内容, client 端一般是 send 请求内容
recv     # server / client 都可以使用 recv 方法, server 端一般是 recv 请求内容, client 端一般是 recv 响应内容
close    # server / client 都可以使用 close 方法用于关闭连接

HTTP Protocol: socket server & requests client

server 与 client 之间使用 HTTP 协议, server 的实现使用 socket, 客户端的实现使用 socket/requests (requests 实际上基于 socket), 本例实现 POST 请求的服务端和客户端代码

server

# socket_server.py
import socket

# 创建一个套接字对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
# host = socket.gethostname()
host = "127.0.0.1"
port = 12345

# 绑定地址与端口
server_socket.bind((host, port))

# 设置最大连接数
server_socket.listen(5)

print("等待客户端连接...")

while True:
    # 建立客户端连接
    client_socket, addr = server_socket.accept()

+     # 接受请求
+     request = client_socket.recv(1204)
    print("连接地址: ", addr)

    # 发送消息给客户端
-    response = 'Hello, client! This is the server response.'
+    response = "HTTP/1.1 200 OK\nContent-Type: text/html\n\nHello, client! This is the server response."
    client_socket.send(response.encode('utf-8'))

    # 关闭连接
    client_socket.close()

client (socket)

# socket_client.py
import socket

# 创建一个套接字对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
# host = socket.gethostname()
host = "127.0.0.1"
port = 12345

# 连接服务端
client_socket.connect((host, port))

# 获取本地端口号
local_port = client_socket.getsockname()[1]
print("客户端本地端口号:", local_port)

- # 接收服务端的消息
- message = client_socket.recv(1024)
- print("来自服务器的消息: ", message.decode('utf-8'))

+ # 构造HTTP请求
+ data = "key=value"
+ request = "POST / HTTP/1.1\nHost: {}:{}\nContent-Length: {}\nContent-Type: application/x-www-form-urlencoded\n\n{}".format(host, port, len(data), data)
+ # 发送请求
+ client_socket.send(request.encode('utf-8'))
+ # 接收服务器响应
+ response = client_socket.recv(1024)

# 关闭连接
client_socket.close()

client (requests)

import requests
data = {'key': 'value'}
response = requests.post('http://localhost:12345', data=data)
print(response.text)

总结

实际上要用 socket 实现 HTTP 请求, 只是 sendrecv 的数据满足格式要求, 即 HTTP 协议.

简要介绍一下 HTTP 协议的数据格式 (详情可参照: https://developer.mozilla.org/en-US/docs/Web/HTTP/Overview), 以 GET 请求为例, 客户端的请求数据格式为

GET <path> HTTP/1.1
<header-key-1>: <header-value-1>
<header-key-2>: <header-value-2>

<body>

例子:

GET / HTTP/1.1
Host: developer.mozilla.org
Accept-Language: fr

服务端的相应数据格式

HTTP/1.1 <status-code> <status-message>
<header-key-1>: <header-value-1>
<header-key-2>: <header-value-2>

<body>

例子

HTTP/1.1 200 OK
Date: Sat, 09 Oct 2010 14:28:02 GMT
Server: Apache
Last-Modified: Tue, 01 Dec 2009 20:18:22 GMT
ETag: "51142bc1-7449-479b075b2891b"
Accept-Ranges: bytes
Content-Length: 29769
Content-Type: text/html

<!DOCTYPE html>… (here come the 29769 bytes of the requested web page)

concurrent socket server

以下代码是这篇博客的最终版本 https://ruslanspivak.com/lsbaws-part3/, 配上个人的一些注解

# webserver3g.py
import errno
import os
import signal
import socket

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 1024


def grim_reaper(signum, frame):
    # 这里这个写法很费解, 目前还不能完全理解, 暂时只能当作固定写法!!!!
    # (1) 阻塞: 进程在阻塞时可能会忽略掉信号
    # (2) os.WNOHANG: 实际值是整数 1
    # (3) pid, status = os.waitpid(-1, os.WNOHANG) 的返回值这么理解: 如果有任何子进程结束, 则返回 子进程ID 和 退出状态码, 否则返回 (0, 0)
    # 
    # 博客中的一个不完美版是使用 os.wait(), os.wait() 是阻塞的, 如果子进程几乎同时停止, 发送 SIGCHLD 给主进程那么有可能在阻塞时错过信号
    # 改进为 While True 循环, 且使用 os.waitpid(-1, os.WNOHANG) 时, 后者是非阻塞的, 但不清楚有没有可能在一次 grim_reaper 时触发多次返回值不为 (0, 0) 的 os.waitpid 调用
    while True:
        try:
            pid, status = os.waitpid(
                -1,          # Wait for any child process
                 os.WNOHANG  # Do not block and return EWOULDBLOCK error
            )
        except OSError:  # 如果子进程已全部结束, 调用 os.waitpid(-1, os.WNOHANG) 会触发 ChildProcessError (OSError 的子类)
            return

        if pid == 0:  # no more zombies
            return


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    signal.signal(signal.SIGCHLD, grim_reaper)

    while True:
        # Q: 为什么只对这一小部分做系统中断的处理?
        # A: 原因是作为主进程来说, 只会循环执行如下代码:
        # client_connection, client_address = listen_socket.accept()
        # pid = os.fork()
        # client_connection.close()
        # 而只有第一行是阻塞的, 会在接受到信号时产生系统中断错误, 其余两行都不会阻塞
        try:
            client_connection, client_address = listen_socket.accept()
        except IOError as e:
            code, msg = e.args
            # restart 'accept' if it was interrupted
            if code == errno.EINTR:
                continue
            else:
                raise

        pid = os.fork()
        if pid == 0:  # child
            listen_socket.close()  # close child copy
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)
        else:  # parent
            client_connection.close()  # close parent copy and loop over

if __name__ == '__main__':
    serve_forever()

WSGI (TODO: 需要先参考材料再总结)

首先, 什么是 WSGI, 为什么需要 WSGI 呢? 从上一节原生使用 socket 实现 server 与 client 来看, 在 client 端, 普通用户使用浏览器进行访问时已经很好地封装了 client socket 的逻辑, 操作简单; 而对于程序员来说, 可以使用 requests 包来简单地发起 HTTP 请求, 总的来说 client 端已经非常方便. 我们再来看看 server 端的情况, server 端主要是由程序员来编写, 直接使用 socket 处理 sendrecv 的逻辑会比较烦人, 而且也容易出错, 并且也要求程序员对网络编程比较了解, 因此我们希望把这些部分剥离出来, 自然而然会希望有类似这种接口:

# web application 部分:
app = App()   # 业务逻辑: 用户访问某个地址时应该做什么
# web server 部分:
server = make_server(app)
server.run()

于是, server 端代码就被拆解为了两部分: web server 与 web application. 其中 web server 的部分专门处理网络收发相关的东西, 这些代码都是比较固定的, 无需程序员去修改. 而 web application 部分属于业务逻辑, 程序员只需关注业务逻辑, 不用管网络相关的东西, 这看上去很美妙. 然而, web server 端的那些看上去很固定的东西, 却同样有许多实现方式, 例如是否用进程池, 线程池, 是否保持连接等等, 导致 web server 端有诸多实现 (例如: gunicorn, uWSGI). 而 web application 端, 程序员同样希望有些框架可以帮忙将代码组织的比较整洁直观, 于是需要出现了一些 web application framework (例如: flask, django, tornado). 这样子 web server 与 web application framework 之间的桥梁最好是有统一的约定, 这就是 WSGI, 注意: WSGI 是一个约定说明书, 而不是一个包. 这样一来, 任意的 web application framework 可以与任意的 web server 相匹配. 注意不同的 web server 可能适用于不同的场景, 例如有些适合做高并发, 有些适合低延迟等等, 而程序员将业务代码 (使用某个 web application framework 写好的 web application) 写好后, 可以根据业务情况自由选择适合的 web server, 而无需修改太多代码.

本节主要实现一个简单的 WSGI web server (对标 gunicorn) 与 WSGI application framework (对标 Flask). 更具体地, 我们要写四个脚本

  • wsgi_app.py: 包含一个能使用 gunicorn wsgi_app:app 部署的 web application
  • wsgi_server.py: 包含一个 WSGI web server 的实现, 可以使用 python wsgi_server.py wsgi_app:app 把上一步写的 web application 进行部署
  • wsgi_framework.py: 包含一个 WSGI application framework 的实现, 它将类似于 Flask 的 API
  • wsgi_framework_app.py: 包含一个使用 wsgi_framework 模块来定义的 web application, 可以使用 gunicorn wsgi_framework_app:apppython wsgi_server.py wsgi_framework_app:app 进行部署

WSGI

注意: WSGI 协议主要是方便 server 与 framework 的开发人员, 使用 framework 开发 application 的开发人员只需要弄清 framework 即可.

感觉协议内容非常费解, 理不清 server 与 app 端分别提供什么协作.

参考:

  • PEP 3333
  • 实际实现: app 端: flask/app/Flask:__call__, server 端: werkzeug/serving/WSGIRequestHandler

以下看下怎么只给大致, 不要附源码 (直接给链接即可)

总结 (TODO!!!!!)

所谓的 WSGI 其实就是约定 WSGI application 需要是这种东西:

# type hint 待确认
# flask/app.py
from typing import Protocol
class StartResponse(Protocol):
    def __call__(
        self, __status: str, __headers: list[tuple[str, str]], __exc_info: _OptExcInfo | None = ...
    ) -> Callable[[bytes], object]: ...

WSGIEnvironment: TypeAlias = dict[str, Any]
WSGIApplication: TypeAlias = Callable[[WSGIEnvironment, StartResponse], Iterable[bytes]]

def application(
    environ: WSGIEnvironment,
    start_response: WSGIApplication
) -> Iterable[bytes]:
    content_iter, status, headers = self.get_response(environ)
    # status: str = "200 OK"
    # headers: list[tuple[str, str]] = [('Content-Type', 'text/plain')]
    start_response(status, headers)

    # content_iter: Iterable[bytes]
    return content_iter

而 WSGI server 大致如下:

class WSGIServer:
    def handle_request_end_to_end(self):
        client_socket, addr = self.socket.accept()
        request_raw_data = client_socket.recv(1024)
        environ = parse_request(request_raw_data)
        
        resp_status: str | None = None
        resp_headers: list[tuple[str, str]] | None = None
        def start_response(status, headers, exc_info=None):
            nonlocal resp_status, resp_headers
            resp_status = status
            resp_headers = headers
            return write
        def write(content_iter):
            nonlocal resp_status, resp_headers
            client_socket.sendall(to_response_data(resp_status, resp_headers, content_iter))

        content_iter = self.app(environ, start_response)
        write(content_iter)
        client_socket.close()

具体细节 PEP 3333 都由精确地描述 (包括 environ 必须包含哪些键值对, start_response 的输入与输出)

  • environ: 由 server 端提供, 供 app 端使用. server 端负责 socket.accept 并且将 socket.recv 的数据转化为 environ
  • start_response: 由 server 端提供, 供 app 端使用. app 端承诺一定会调用这个方法. 而 server 端需要保证: 只要此函数被调用, server 端就能记录好 response status 和 headers (注意 start_response 不会发送数据). start_response 会返回一个 write 方法, 但 PEP 3333 里地描述是尽量不依赖这个返回的 write 方法.
  • application: 由 app 提供, server 端负责调用, application 返回的结果是 response content, server 端进一步使用 socket.send 将相应结果返回

environ

web application 要求 server 端调用时必须保证入参 environ 必须是字典, 且包括以下字段才能正常工作:

environ = {
    # CGI required:
    "REQUEST_METHOD": "POST",
    "SCRIPT_NAME": "xx",  # 可缺失, 不确定含义?
    "PATH_INFO": "/a/b/",  # 可缺失, 表示请求的是 http://1.2.3.4:5678/a/b/
    "QUERY_STRING": "q=3",  # 可缺失, 表示请求是 http://1.2.3.4:5678/a/b/?q=3
    "CONTENT_TYPE": "",   # 可缺失
    "CONTENT_LENGTH": "12",  # 可缺失
    "SERVER_NAME": "1.2.3.4",
    "SERVER_HOST": "5678",
    "SERVER_PROTOCOL": "HTTP/1.1",
    # "HTTP_*", 客户端的 HTTP 请求头, 请求头中包含 "foo-bar": "xxx", 那么此处应该变为 "HTTP_foo_bar": "xxx"

    # WSGI defined variables:
    "wsgi.version": (1, 0),
    "wsgi.url_scheme": "http",
    "wsgi.input": io.StringIO('{"a": "xyz", "b": 123}'),  # PEP 3333 里规定它必须包含 readlines 等方法
    "wsgi.errors": sys.stderr,  # PEP 3333 里规定它必须包含 write 等方法
    "wsgi.multithread": False,  # 不确定含义
    "wsgi.multiprocess": False, # 不确定含义
    "wsgi.run_once": False,     # 不确定含义
}

Real World Example: Flask + gunicorn

app 端

# app 端: flask/app/Flask:__call__
class Flask(App):
    def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> cabc.Iterable[bytes]:
        return self.wsgi_app(environ, start_response)

    def wsgi_app(self, environ: WSGIEnvironment, start_response: StartResponse) -> cabc.Iterable[bytes]:
        ctx = self.request_context(environ)
        error: BaseException | None = None
        try:
            ctx.push()
            response = self.full_dispatch_request()  # response: werkzeug.wrappers.Response
            return response(environ, start_response)
        finally:  # 总会执行 finally
            if "werkzeug.debug.preserve_context" in environ:
                environ["werkzeug.debug.preserve_context"](_cv_app.get())
                environ["werkzeug.debug.preserve_context"](_cv_request.get())
            if error is not None and self.should_ignore_error(error):
                error = None
            ctx.pop(error)

# werkzeug.wrappers.Response
class Response(_SansIOResponse):
    def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> t.Iterable[bytes]:
        app_iter, status, headers = self.get_wsgi_response(environ)
        start_response(status, headers)  # 注意 app 端会去调用 start_response, 但是不关心其返回的结果 write
        return app_iter

server 端

# server 端: 忽略 sockect 部分, 调用 app 的逻辑在此
class WSGIRequestHandler(BaseHTTPRequestHandler):
    def make_environ(self) -> WSGIEnvironment:
        request_url = urlsplit(self.path)
        url_scheme = "http" if self.server.ssl_context is None else "https"

        if not self.client_address:
            self.client_address = ("<local>", 0)
        elif isinstance(self.client_address, str):
            self.client_address = (self.client_address, 0)

        if not request_url.scheme and request_url.netloc:
            path_info = f"/{request_url.netloc}{request_url.path}"
        else:
            path_info = request_url.path

        path_info = unquote(path_info)

        environ: WSGIEnvironment = {
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": url_scheme,
            "wsgi.input": self.rfile,
            "wsgi.errors": sys.stderr,
            "wsgi.multithread": self.server.multithread,
            "wsgi.multiprocess": self.server.multiprocess,
            "wsgi.run_once": False,
            "werkzeug.socket": self.connection,
            "SERVER_SOFTWARE": self.server_version,
            "REQUEST_METHOD": self.command,
            "SCRIPT_NAME": "",
            "PATH_INFO": _wsgi_encoding_dance(path_info),
            "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
            # Non-standard, added by mod_wsgi, uWSGI
            "REQUEST_URI": _wsgi_encoding_dance(self.path),
            # Non-standard, added by gunicorn
            "RAW_URI": _wsgi_encoding_dance(self.path),
            "REMOTE_ADDR": self.address_string(),
            "REMOTE_PORT": self.port_integer(),
            "SERVER_NAME": self.server.server_address[0],
            "SERVER_PORT": str(self.server.server_address[1]),
            "SERVER_PROTOCOL": self.request_version,
        }

        for key, value in self.headers.items():
            if "_" in key:
                continue

            key = key.upper().replace("-", "_")
            value = value.replace("\r\n", "")
            if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
                key = f"HTTP_{key}"
                if key in environ:
                    value = f"{environ[key]},{value}"
            environ[key] = value

        if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
            environ["wsgi.input_terminated"] = True
            environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])

        if request_url.scheme and request_url.netloc:
            environ["HTTP_HOST"] = request_url.netloc

        try:
            peer_cert = self.connection.getpeercert(binary_form=True)
            if peer_cert is not None:
                environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
        except ValueError:
            self.server.log("error", "Cannot fetch SSL peer certificate info")
        except AttributeError:
            pass

        return environ

    def run_wsgi(self) -> None:
        if self.headers.get("Expect", "").lower().strip() == "100-continue":
            self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")

        self.environ = environ = self.make_environ()
        status_set: str | None = None
        headers_set: list[tuple[str, str]] | None = None
        status_sent: str | None = None
        headers_sent: list[tuple[str, str]] | None = None
        chunk_response: bool = False

        def write(data: bytes) -> None:
            nonlocal status_sent, headers_sent, chunk_response
            assert status_set is not None, "write() before start_response"
            assert headers_set is not None, "write() before start_response"
            if status_sent is None:
                status_sent = status_set
                headers_sent = headers_set
                try:
                    code_str, msg = status_sent.split(None, 1)
                except ValueError:
                    code_str, msg = status_sent, ""
                code = int(code_str)
                self.send_response(code, msg)
                header_keys = set()
                for key, value in headers_sent:
                    self.send_header(key, value)
                    header_keys.add(key.lower())

                if (
                    not (
                        "content-length" in header_keys
                        or environ["REQUEST_METHOD"] == "HEAD"
                        or (100 <= code < 200)
                        or code in {204, 304}
                    )
                    and self.protocol_version >= "HTTP/1.1"
                ):
                    chunk_response = True
                    self.send_header("Transfer-Encoding", "chunked")

                self.send_header("Connection", "close")
                self.end_headers()

            assert isinstance(data, bytes), "applications must write bytes"

            if data:
                if chunk_response:
                    self.wfile.write(hex(len(data))[2:].encode())
                    self.wfile.write(b"\r\n")

                self.wfile.write(data)

                if chunk_response:
                    self.wfile.write(b"\r\n")

            self.wfile.flush()

        def start_response(status, headers, exc_info=None):  # type: ignore
            nonlocal status_set, headers_set
            if exc_info:
                try:
                    if headers_sent:
                        raise exc_info[1].with_traceback(exc_info[2])
                finally:
                    exc_info = None
            elif headers_set:
                raise AssertionError("Headers already set")
            status_set = status
            headers_set = headers
            return write

        def execute(app: WSGIApplication) -> None:
            application_iter = app(environ, start_response)
            try:
                for data in application_iter:
                    write(data)
                if not headers_sent:
                    write(b"")
                if chunk_response:
                    self.wfile.write(b"0\r\n\r\n")
            finally:
                selector = selectors.DefaultSelector()
                selector.register(self.connection, selectors.EVENT_READ)
                total_size = 0
                total_reads = 0
                while selector.select(timeout=0.01):
                    data = self.rfile.read(10_000_000)
                    total_size += len(data)
                    total_reads += 1
                    if not data or total_size >= 10_000_000_000 or total_reads > 1000:
                        break
                selector.close()
                if hasattr(application_iter, "close"):
                    application_iter.close()

        try:
            execute(self.server.app)
        # except XX as x:
        #     ...

WSGI web application

参考自: https://testdriven.io/courses/python-web-framework/wsgi/

# wsgi_app.py
def app(environ, start_response):
    status = "200 OK"
    response_headers = [
        ('Content-type', 'text/plain'),
    ]
    # 直接用位置参数传递
    start_response(status, response_headers)  # start_response: gunicorn.http.wsgi.Response
    response_body = "\n".join([f"{k}: {v}" for k, v in environ.items()])  # 如果使用 POST 请求, POST 的请求数据位于 environ["wsgi.input"] 里
    # print(environ["wsgi.input"].readlines())
    return iter([response_body.encode('utf-8')])

使用一个现成的 WSGI web server 启动: gunicorn wsgi_app:app, 打开 http://localhost:8000 后会看到如下返回结果

wsgi.errors: <gunicorn.http.wsgi.WSGIErrorsWrapper object at 0x7fd47c1d64c0>
wsgi.version: (1, 0)
wsgi.multithread: False
wsgi.multiprocess: False
wsgi.run_once: False
wsgi.file_wrapper: <class 'gunicorn.http.wsgi.FileWrapper'>
wsgi.input_terminated: True
SERVER_SOFTWARE: gunicorn/20.1.0
wsgi.input: <gunicorn.http.body.Body object at 0x7fd47c1d6d30>
gunicorn.socket: <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8000), raddr=('127.0.0.1', 59450)>
REQUEST_METHOD: GET
QUERY_STRING: 
RAW_URI: /
SERVER_PROTOCOL: HTTP/1.1
HTTP_HOST: 127.0.0.1:8000
HTTP_CONNECTION: keep-alive
HTTP_CACHE_CONTROL: max-age=0
HTTP_SEC_CH_UA: "Not A(Brand";v="99", "Microsoft Edge";v="121", "Chromium";v="121"
HTTP_SEC_CH_UA_MOBILE: ?0
HTTP_SEC_CH_UA_PLATFORM: "Windows"
HTTP_UPGRADE_INSECURE_REQUESTS: 1
HTTP_USER_AGENT: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0
HTTP_ACCEPT: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
HTTP_SEC_FETCH_SITE: none
HTTP_SEC_FETCH_MODE: navigate
HTTP_SEC_FETCH_USER: ?1
HTTP_SEC_FETCH_DEST: document
HTTP_ACCEPT_ENCODING: gzip, deflate, br
HTTP_ACCEPT_LANGUAGE: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6
HTTP_COOKIE: _ga=GA1.1.1730874703.1686748962
wsgi.url_scheme: http
REMOTE_ADDR: 127.0.0.1
REMOTE_PORT: 59450
SERVER_NAME: 127.0.0.1
SERVER_PORT: 8000
PATH_INFO: /
SCRIPT_NAME: 

理解

这里的整体流程是 client 端发送了一个 HTTP GET 请求给到 WSGI web server (在这个例子里是 gunicorn), 然后 gunicorn 将请求信息包裹为 environ: Dict[str, Any], 并且构造 start_response: gunicorn.http.wsgi.Response, 然后进入到 WSGI web application (在这个例子里是 app 函数) 内执行, 然后 WSGI web server 将 WSGI web application 的返回结果转化为 HTTP 响应返回给 client.

WSGI web server

直接参考自 https://ruslanspivak.com/lsbaws-part2/

import socket
import io
import sys

class WSGIServer(object):

    address_family = socket.AF_INET
    socket_type = socket.SOCK_STREAM
    request_queue_size = 1

    def __init__(self, server_address):
        self.listen_socket = listen_socket = socket.socket(self.address_family, self.socket_type)
        listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listen_socket.bind(server_address)
        listen_socket.listen(self.request_queue_size)
        host, port = self.listen_socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port
        # Return headers set by Web framework/Web application
        self.headers_set = []

    def set_app(self, application):
        self.application = application

    def serve_forever(self):
        listen_socket = self.listen_socket
        while True:
            self.client_connection, client_address = listen_socket.accept()
            self.handle_one_request()

    def handle_one_request(self):
        self.client_connection.settimeout(0.01)
        request_data = b''
        while True:
            try:
                data = self.client_connection.recv(1024)
            except socket.timeout:
                data = b''
            request_data += data
            if not data:
                break
        self.request_data = request_data = request_data.decode('utf-8')
        self.parse_request(request_data)
        env = self.get_environ()
        result = self.application(env, self.start_response)
        self.finish_response(result)

    def parse_request(self, text):
        request_line = text.splitlines()[0]
        request_line = request_line.rstrip('\r\n')
        (self.request_method,  # GET
         self.path,            # /hello
         self.request_version  # HTTP/1.1
         ) = request_line.split()

    def get_environ(self):
        env = {}
        # Required WSGI variables
        env['wsgi.version']      = (1, 0)
        env['wsgi.url_scheme']   = 'http'
        env['wsgi.input']        = io.StringIO(self.request_data)
        env['wsgi.errors']       = sys.stderr
        env['wsgi.multithread']  = False
        env['wsgi.multiprocess'] = False
        env['wsgi.run_once']     = False
        # Required CGI variables
        env['REQUEST_METHOD']    = self.request_method    # GET
        env['PATH_INFO']         = self.path              # /hello
        env['SERVER_NAME']       = self.server_name       # localhost
        env['SERVER_PORT']       = str(self.server_port)  # 8888
        return env

    def start_response(self, status, response_headers, exc_info=None):
        # Add necessary server headers
        server_headers = [
            ('Date', 'Mon, 15 Jul 2019 5:54:48 GMT'),
            ('Server', 'WSGIServer 0.2'),
        ]
        self.headers_set = [status, response_headers + server_headers]
        # To adhere to WSGI specification the start_response must return
        # a 'write' callable. We simplicity's sake we'll ignore that detail
        # for now.
        # return self.finish_response

    def finish_response(self, result):
        try:
            status, response_headers = self.headers_set
            response = f'HTTP/1.1 {status}\r\n'
            for header in response_headers:
                response += '{0}: {1}\r\n'.format(*header)
            response += '\r\n'
            for data in result:
                response += data.decode('utf-8')
            response_bytes = response.encode()
            self.client_connection.sendall(response_bytes)
        finally:
            self.client_connection.close()

SERVER_ADDRESS = (HOST, PORT) = '127.0.0.1', 8000

def make_server(server_address, application):
    server = WSGIServer(server_address)
    server.set_app(application)
    return server

if __name__ == '__main__':
    if len(sys.argv) < 2:
        sys.exit('Provide a WSGI application object as module:callable')
    app_path = sys.argv[1]
    module, application = app_path.split(':')
    module = __import__(module)
    application = getattr(module, application)
    httpd = make_server(SERVER_ADDRESS, application)
    print(f'WSGIServer: Serving HTTP on port {PORT} ...\n')
    httpd.serve_forever()

WSGI application framework

wsgi_framework.py 内容: 构建一个类似于 Flask 的框架

# wsgi_framework.py
class App:
    def __init__(self):
        self.view_functions = {}
    def route(self, endpoint, method: str):
        def decorator(f):
            if method not in self.view_functions:
                self.view_functions[method] = {}
            self.view_functions[method][endpoint] = f
            return f
        return decorator
    def __call__(self, environ, start_response):
        status = "200 OK"
        response_headers = [
            ('Content-type', 'text/plain'),
        ]
        start_response(status, response_headers)
        method = environ['REQUEST_METHOD']
        endpoint = environ['PATH_INFO']
        query_string = environ.get("QUERY_STRING", None)
        body = environ["wsgi.input"].read()
        # print(environ)
        # print(query_string)  # str
        # print(body)          # bytes
        result = self.view_functions[method][endpoint](query_string, body)
        return iter([str(result).encode('utf-8')])

wsgi_framework_app.py 内容: 使用上述框架

# wsgi_framework_app.py
from wsgi_framework import App
app = App()

@app.route("/a", method="POST")
def foo(params, body):
    return {"result": "hello", "params": params, "body": body.decode("utf-8")}

运行 (!!! TODO: 使用 python wsgi_server.py wsgi_framework_app:app 还有问题, 主要是 QUERY_STRING 有问题):

gunicorn wsgi_framework_app:app
# python
# >>> import requests
# >>> resp = requests.post("http://127.0.0.1:8000/a?x=1", json={str(i): i for i in range(3)})
# >>> resp.content
# b'{\'result\': \'hello\', \'params\': \'x=1\', \'body\': \'{"0": 0, "1": 1, "2": 2}\'}

concurrent WSGI web server

按照 concurrent socket server 即可实现, 但存在几个问题:

  • 如何用多线程而不是基于 fork 的多进程实现
  • 如何用使用进程池实现, 而不是用底层的 fork: fork 过于底层, 且请求并发量过大时 fork 过多会耗尽文件描述符

ASGI (TODO)