Python FastAPI 时延飙升与连接积压生产事故复盘:从阻塞 I/O 到优雅熔断

Python FastAPI 时延飙升与连接积压生产事故复盘:从阻塞 I/O 到优雅熔断

技术主题:Python 编程语言
内容方向:生产环境事故的解决过程(故障现象、根因分析、解决方案、预防措施)

引言

一个平平无奇的促销晚高峰,我们的 API 网关监控突然报警:P95 时延从 120ms 飙到 3.2s,活跃连接上升,错误率波动。现场表现是“接口卡住不动”,一查容器里 Uvicorn 进程占用不高,但下游 HTTP 调用响应堆积。本文复盘这次事故,从现象到根因,再到系统性修复,最后给出可直接复用的代码骨架与配置清单。

一、故障现象与影响

  • P95 从 ~120ms → 3.2s,长尾 P99 超 10s;
  • Gunicorn backlog 上升(等待接受连接),部分 502/504;
  • 下游服务 QPS 波动,重试风暴短时出现;
  • 应用容器 CPU 占用不高,但线程池耗尽告警出现。

二、排查与复现场景

  1. 观测与采样:抓取慢请求日志(含 trace_id)、连接数、线程池指标、事件循环滞后;
  2. 火焰图/采样:确认部分 handler 存在阻塞调用(CPU 计算/阻塞 I/O);
  3. httpx 侧:发现每次请求都新建客户端,连接复用率极低,TIME_WAIT 增多;
  4. 部署参数:Gunicorn workers 偏少,keepalive/timeout 不合理,导致等待队列放大;
  5. 超时语义:上游/下游超时配置不一致,取消传播缺失,导致“僵尸请求”。

三、根因分析

  • 事件循环被阻塞:在 async 视图里执行阻塞 I/O/CPU,挤占事件循环;
  • 连接池耗尽:httpx 客户端每次 new,池内连接无法复用,外加 keep-alive 失配引发积压;
  • 线程池饱和:默认 ThreadPoolExecutor 规模偏小,阻塞任务排队,扩大尾延迟;
  • 超时与取消策略缺失:没有 per-request 超时与取消传播,重试与超时不成套;
  • 进程/线程/连接参数不匹配:Gunicorn worker 数、Uvicorn worker-class、keepalive、backlog、timeout 设置不当。

四、修复方案与关键代码

4.1 全局异步客户端与连接池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# app/deps.py
from fastapi import FastAPI
import httpx
import asyncio

client: httpx.AsyncClient | None = None
upstream_sema: asyncio.Semaphore | None = None

def init_clients(app: FastAPI):
global client, upstream_sema
# 适度的连接池与超时(按下游 SLO 调整)
limits = httpx.Limits(max_connections=200, max_keepalive_connections=100)
timeouts = httpx.Timeout(connect=2.0, read=3.0, write=3.0, pool=2.0)
client = httpx.AsyncClient(limits=limits, timeout=timeouts, http2=True)
upstream_sema = asyncio.Semaphore(100) # 下游并发上限

async def close_clients():
global client
if client:
await client.aclose()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# app/main.py
from fastapi import FastAPI, HTTPException
from app.deps import init_clients, close_clients, client, upstream_sema
import asyncio

app = FastAPI()

@app.on_event("startup")
async def _startup():
init_clients(app)

@app.on_event("shutdown")
async def _shutdown():
await close_clients()

# 阻塞任务包装(CPU/I-O 需丢到线程池)
async def run_blocking(func, *args, loop=None):
loop = loop or asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args))

async def call_upstream(path: str, timeout_sec: float = 2.5):
# 并发限流 + 超时 + 取消传播
async with upstream_sema: # 背压
try:
return await asyncio.wait_for(
client.get(f"https://api.example.com/{path}", headers={"X-Trace": "..."}),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="upstream timeout")

@app.get("/search")
async def search(q: str):
# 示例:混合阻塞逻辑(比如本地 PDF 解析)+ 下游调用
def cpu_task(x: str) -> str:
# 假设较重的 CPU 处理
return x.upper()

text = await run_blocking(cpu_task, q)
resp = await call_upstream(f"search?q={text}")
return {"q": q, "data": resp.json()}

要点:

  • 全局 AsyncClient 复用连接池;
  • 设置合理 limits/timeout,开启 HTTP/2 提升复用;
  • 使用 Semaphore 限制对下游的并发度,形成背压;
  • 阻塞代码用 run_in_executor 隔离,避免阻塞事件循环。

4.2 熔断与降级(简化版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# app/circuit.py
import time
import asyncio

class Circuit:
def __init__(self, fail_threshold: int = 20, cool_ms: int = 2000):
self.fail = 0
self.open_until = 0
self.threshold = fail_threshold
self.cool_ms = cool_ms

def allow(self) -> bool:
return time.time() * 1000 >= self.open_until

def record(self, ok: bool):
if ok:
self.fail = 0
return
self.fail += 1
if self.fail >= self.threshold:
self.open_until = time.time() * 1000 + self.cool_ms
self.fail = 0

circuit = Circuit()

async def guarded_call(coro, *, fallback):
if not circuit.allow():
return fallback()
try:
res = await coro
circuit.record(True)
return res
except Exception:
circuit.record(False)
return fallback()

4.3 部署与运行参数(Gunicorn + UvicornWorker)

1
2
3
4
5
6
7
8
9
10
11
12
# gunicorn.conf.py(示例,按 CPU/内存与 SLO 调整)
import multiprocessing
workers = max(2, multiprocessing.cpu_count()) # 进程数
worker_class = "uvicorn.workers.UvicornWorker"
threads = 1 # 线程仅用于阻塞少的场景
worker_connections = 1000 # 每 worker 的并发连接上限(异步)
backlog = 2048 # 未接受连接队列
keepalive = 5 # keep-alive 秒数
timeout = 30 # worker 处理超时(不要太大)
gracetimeout = 30
loglevel = "info"
bind = "0.0.0.0:8000"

运行:

1
gunicorn -c gunicorn.conf.py app.main:app

调参建议:

  • workers ≈ CPU 核心数(异步 worker 不要盲目加 threads);
  • keepalive 过大可能放大连接占用;
  • timeout 不宜无限制,配合业务超时/取消传播;
  • 结合容器内核参数(fd 上限)与 httpx 连接池上限。

五、验证与观测

  • 压测:维持原始 QPS × 1.5,P95 降回 <150ms,错误率 <0.1%;
  • 连接:TIME_WAIT 总数明显下降,httpx 连接复用率 >90%;
  • 线程池:executor 队列长度接近 0,事件循环滞后指标恢复;
  • 熔断:下游故障时,开启后失败率断崖式下降,平均延迟稳定;
  • 可观测:暴露指标(并发度、队列长度、超时次数、熔断状态)。

六、防复发清单

  • 代码层:
    • 禁止在 async handler 内直接做阻塞 I/O/CPU;
    • 统一全局 httpx AsyncClient 与连接池;
    • 为所有外部调用设置超时与取消传播;
    • 对下游加 Semaphore 并发上限与退避重试(避免风暴);
  • 部署层:
    • Gunicorn/Uvicorn 参数配套(workers、backlog、keepalive、timeout);
    • 容器 fd 与内核参数按峰值 QPS 校核;
  • 观测层:
    • 打点:事件循环滞后、线程池队列、httpx 连接池、熔断状态;
    • 日志:慢请求采样、trace_id 贯通、异常分类;
  • 灰度与压测:
    • 变更前压测 + 金丝雀发布,观察并回滚预案。

总结

这次事故看似“下游慢”,实则是我们在异步栈上的工程约束缺位:阻塞 I/O 入侵事件循环、连接池与超时配置不当、部署参数与负载不匹配。补齐“全局连接池 + 并发限流 + 超时取消 + 熔断降级”的四件套后,系统在高峰时也能优雅退让,把不可控的延迟与错误约束在可控范围内。