AI Agent 生产事故复盘:工具调用超时与任务卡死的定位与修复

AI Agent 生产事故复盘:工具调用超时与任务卡死的定位与修复

技术主题:AI Agent(人工智能代理)
内容方向:生产环境事故的解决过程(故障现象、根因分析、修复方案、预防措施)

引言

某工作日午高峰,我们的 AI Agent 在“多工具并发执行”场景下出现大量任务超时与卡死:用户端收到“思考中”却迟迟无结果,平台侧 QPS 下滑、排队暴涨。本文复盘该事故的定位与修复过程,并沉淀一套可复制的治理方案。

一、故障现象

  • 成功率从 98% 跌至 72%,P95 延迟由 1.2s 飙至 9s;
  • 执行轨迹中出现“工具调用超过 30s 未返回”“任务未按时终止”;
  • 单任务并发子步骤数激增,平台队列长度 10 倍放大;
  • 个别外部 API 出现间歇性 5xx,但并非全面故障。

二、排查步骤

  1. 复现场景:重放近 2 小时内的失败任务,附带工具输入/输出快照;
  2. 切分维度:按工具类型、供应商域名、地区机房、任务并发度分桶;
  3. 栈与事件:采集编排器事件(Plan/Act/Observe/Finish)与 asyncio Task 状态;
  4. 指标交叉:对齐外部依赖 RT/错误率、线程/协程活跃数、队列长度、超时分布。

三、根因分析

  • 超时未统一:个别工具调用缺乏硬性 deadline,wait_forever 导致卡死;
  • 重试无上限:指数退避缺失,抖动不足,加剧尖峰期堆积;
  • 幂等缺位:外部 API 的重复调用产生“回滚-再试-重复副作用”;
  • 编排缺少取消传播:父任务超时后,子 Task 未被取消,资源长期占用。

四、修复方案与关键代码(Python)

1) 统一超时与取消传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
from contextlib import asynccontextmanager

DEFAULT_DEADLINE = 6.0 # 单次工具调用硬超时

async def tool_call(func, *args, deadline: float = DEFAULT_DEADLINE, **kwargs):
return await asyncio.wait_for(func(*args, **kwargs), timeout=deadline)

@asynccontextmanager
async def cancellation_scope():
tasks = []
try:
yield tasks
finally:
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

要点:所有工具调用走统一入口;父级阶段退出时,通过上下文管理器批量取消子任务,确保资源释放。

2) 可控重试与抖动退避

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import random

async def retry_with_backoff(coro_factory, *, attempts=3, base=0.2, factor=1.7, jitter=0.3):
exc = None
for i in range(attempts):
try:
return await coro_factory()
except Exception as e:
exc = e
if i == attempts - 1:
break
delay = min(base * (factor ** i) + random.uniform(0, jitter), 3.0)
await asyncio.sleep(delay)
raise exc

要点:限制最大尝试次数与延迟上限;引入随机抖动,降低瞬时对下游的同步冲击。

3) 轻量 Circuit Breaker(熔断)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time

class CircuitBreaker:
def __init__(self, failure_threshold=5, cool_down=10):
self.fail = 0
self.open_until = 0
self.threshold = failure_threshold
self.cool_down = cool_down
def allow(self):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit open")
def report(self, ok: bool):
if ok:
self.fail = 0
else:
self.fail += 1
if self.fail >= self.threshold:
self.open_until = time.time() + self.cool_down

cb = CircuitBreaker()

async def guarded_call(fn, *a, **kw):
cb.allow()
try:
r = await tool_call(fn, *a, **kw)
cb.report(True)
return r
except Exception:
cb.report(False)
raise

要点:在不可用窗口直接拒绝调用,保护下游与自身线程/协程池。

4) 幂等与去重(Idempotency Key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import hashlib
from typing import Dict

_seen: Dict[str, float] = {}

def idem_key(name: str, payload: dict) -> str:
raw = name + ":" + str(sorted(payload.items()))
return hashlib.sha256(raw.encode()).hexdigest()

def guard_idempotent(name: str, payload: dict, ttl=300):
k = idem_key(name, payload)
now = time.time()
if k in _seen and now - _seen[k] < ttl:
raise RuntimeError("duplicated operation")
_seen[k] = now
return k

要点:对“可重入但有副作用”的外部操作,强制使用幂等键;重复触发直接短路或走校验流程。

5) 编排层集成(Plan-Act-Observe)

1
2
3
4
5
6
7
8
9
10
async def run_plan(steps):
async with cancellation_scope() as tasks:
results = []
for s in steps:
guard_idempotent(s["tool"], s["args"]) # 幂等
async def once():
return await guarded_call(s["func"], **s["args"]) # 熔断+超时
res = await retry_with_backoff(lambda: once()) # 有界重试
results.append(res)
return results

要点:每步强制“幂等 → 熔断/超时 → 有界重试”,父任务结束必然触发取消,杜绝“孤儿子任务”。

五、验证与监控

  • A/B 发布 3 天:成功率 72%→97.8%,P95 延迟 9s→1.6s;
  • 超时任务占比 <1.5%,卡死(>60s 无事件)清零;
  • 监控项:
    • 工具层:请求数、成功率、超时率、平均重试次数、熔断打开时长;
    • 编排层:并发任务数、队列长度、取消次数、孤儿子任务计数;
    • 告警:连续 5 分钟超时率 >2%,或熔断时间占比 >10%。

预防清单

  • 所有工具调用强制 deadline,父任务必须传播取消;
  • 重试有上限并带抖动;
  • 高风险操作启用幂等键与“只增不减”的副作用设计;
  • 预设熔断与降级路径(返回缓存/默认值、异步补偿);
  • 可观测性前置:把调用参数/返回、耗时、异常类型纳入结构化日志,并支持回放。

总结

AI Agent 的可靠性来自“工程化的失败治理”:把不可控等待变成可控失败,把一次性操作变成可重试/可校验的流程。以上骨架无需侵入模型推理逻辑,即可显著提升稳定性,适合逐步在生产中落地。