Python 异步爬虫内存泄漏排查实战:从 OOM 到稳定运行的完整解决过程

Python 异步爬虫内存泄漏排查实战:从 OOM 到稳定运行的完整解决过程

技术主题:Python 编程语言
内容方向:生产环境事故的解决过程(故障现象、根因分析、解决方案、预防措施)

引言

异步爬虫因其高并发、高性能的特点,在数据采集领域得到了广泛应用。然而,在生产环境中,异步编程的复杂性往往会带来一些隐蔽的问题。我们团队在运营一个大型商品信息采集系统时,遭遇了严重的内存泄漏问题:系统在运行6-8小时后就会出现OOM崩溃,严重影响了业务连续性。经过一周的深入排查,我们不仅解决了内存泄漏问题,还总结出了一套完整的异步爬虫内存管理最佳实践。本文将详细记录这次故障排查的完整过程。

一、故障现象与初步分析

故障现象描述

我们的异步爬虫系统在生产环境中表现出以下异常症状:

1
2
3
4
5
6
7
# 典型的错误日志
"""
2024-05-17 14:32:15 ERROR - Memory usage: 7.8GB/8GB (97.5%)
2024-05-17 14:32:45 ERROR - aiohttp.ClientTimeout: Timeout context manager should be used inside a task
2024-05-17 14:33:12 CRITICAL - OSError: [Errno 12] Cannot allocate memory
2024-05-17 14:33:15 CRITICAL - Process killed by OOM killer
"""

关键异常现象:

  • 系统启动后内存使用量持续增长,从初始的800MB增长到8GB+
  • 在运行6-8小时后必然出现OOM崩溃
  • 爬虫并发数越高,内存增长速度越快
  • 重启后问题重现,表明不是偶发问题

系统基本信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 系统配置信息
SYSTEM_CONFIG = {
"服务器配置": "8核16GB内存",
"Python版本": "3.9.7",
"主要依赖": {
"aiohttp": "3.8.3",
"asyncio": "内置",
"aiofiles": "22.1.0"
},
"业务规模": {
"目标网站": "500+个电商网站",
"并发度": "200个并发请求",
"日处理量": "100万+商品页面"
}
}

二、问题排查过程

1. 内存使用分析

首先我们添加了详细的内存监控来观察内存使用模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import psutil
import gc
import asyncio
import tracemalloc
from typing import Dict, List
import logging

class MemoryMonitor:
"""内存监控器"""

def __init__(self, interval: int = 60):
self.interval = interval
self.memory_history: List[Dict] = []
self.is_monitoring = False

# 启用内存追踪
tracemalloc.start()

async def start_monitoring(self):
"""开始内存监控"""
self.is_monitoring = True

while self.is_monitoring:
memory_info = self._collect_memory_info()
self.memory_history.append(memory_info)

# 内存使用率超过80%时告警
if memory_info["memory_percent"] > 80:
logging.warning(f"高内存使用率告警: {memory_info}")

# 分析内存热点
self._analyze_memory_hotspots()

await asyncio.sleep(self.interval)

def _collect_memory_info(self) -> Dict:
"""收集内存信息"""
process = psutil.Process()
memory_info = process.memory_info()

# 获取Python对象统计
gc.collect() # 强制垃圾回收

return {
"timestamp": asyncio.get_event_loop().time(),
"rss_mb": memory_info.rss / 1024 / 1024, # 物理内存
"vms_mb": memory_info.vms / 1024 / 1024, # 虚拟内存
"memory_percent": process.memory_percent(),
"gc_counts": gc.get_count(),
"gc_stats": gc.get_stats(),
"active_tasks": len(asyncio.all_tasks())
}

def _analyze_memory_hotspots(self):
"""分析内存热点"""
try:
# 获取当前内存使用快照
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

logging.info("=== 内存使用热点分析 ===")
for index, stat in enumerate(top_stats[:10], 1):
logging.info(f"#{index}: {stat}")

# 分析最大内存消耗者
if len(top_stats) > 0:
largest = top_stats[0]
logging.warning(f"最大内存消耗: {largest}")

except Exception as e:
logging.error(f"内存热点分析失败: {e}")

def get_memory_trend(self) -> Dict:
"""获取内存增长趋势"""
if len(self.memory_history) < 2:
return {"trend": "insufficient_data"}

recent = self.memory_history[-10:] # 最近10个采样点

memory_values = [item["rss_mb"] for item in recent]
if len(memory_values) >= 2:
growth_rate = (memory_values[-1] - memory_values[0]) / len(memory_values)
return {
"current_memory_mb": memory_values[-1],
"growth_rate_mb_per_minute": growth_rate,
"trend": "increasing" if growth_rate > 1 else "stable"
}

return {"trend": "unknown"}

# 问题代码 - 导致内存泄漏的原始爬虫实现
import aiohttp
import aiofiles
from urllib.parse import urljoin

class ProblematicSpider:
"""有问题的爬虫实现"""

def __init__(self, max_concurrent: int = 200):
self.max_concurrent = max_concurrent
# 问题1: session没有正确管理
self.sessions = {}
self.semaphore = asyncio.Semaphore(max_concurrent)

# 问题2: 全局存储所有响应数据
self.response_cache = {}
self.url_queue = asyncio.Queue()

async def crawl_website(self, base_url: str, urls: List[str]):
"""爬取网站 - 问题版本"""

# 问题3: 为每个网站创建新session但不清理
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(limit=100)
)
self.sessions[base_url] = session

tasks = []
for url in urls:
task = asyncio.create_task(self._crawl_page(session, url))
tasks.append(task)

# 问题4: 等待所有任务完成,但不清理任务引用
results = await asyncio.gather(*tasks, return_exceptions=True)

# 问题5: 返回结果但保留在内存中
self.response_cache[base_url] = results
return results

async def _crawl_page(self, session: aiohttp.ClientSession, url: str):
"""爬取单个页面 - 问题版本"""

async with self.semaphore:
try:
async with session.get(url) as response:
content = await response.text()

# 问题6: 存储大量页面内容在内存中
page_data = {
"url": url,
"content": content, # 完整页面内容
"headers": dict(response.headers),
"timestamp": asyncio.get_event_loop().time()
}

# 问题7: 解析后的数据也全部保存
parsed_data = await self._parse_page(content)
page_data["parsed"] = parsed_data

return page_data

except Exception as e:
# 问题8: 异常处理中没有释放资源
logging.error(f"爬取页面失败: {url}, 错误: {e}")
return {"url": url, "error": str(e)}

async def _parse_page(self, content: str) -> Dict:
"""解析页面内容 - 问题版本"""

# 问题9: 创建大量临时对象不及时清理
from bs4 import BeautifulSoup

soup = BeautifulSoup(content, 'html.parser')

# 提取所有可能的数据,导致内存消耗巨大
extracted_data = {
"title": soup.title.string if soup.title else "",
"links": [urljoin("", link.get('href', '')) for link in soup.find_all('a')],
"images": [img.get('src', '') for img in soup.find_all('img')],
"text_content": soup.get_text(), # 完整文本内容
"all_tags": [str(tag) for tag in soup.find_all()], # 所有标签
}

return extracted_data

通过内存监控,我们发现了几个关键问题:

  • 内存持续增长:每小时增长约1GB,没有回收迹象
  • 对象数量激增:aiohttp相关对象和BeautifulSoup对象大量堆积
  • 任务泄漏:asyncio任务数量持续增加,从200增长到2000+

2. 根因定位

通过深入分析,我们确定了内存泄漏的根本原因:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 根因分析总结
ROOT_CAUSES = {
"Session管理不当": {
"问题": "aiohttp.ClientSession创建后未正确关闭",
"影响": "连接池和相关资源无法释放",
"证据": "sessions字典持续增长,connector对象堆积"
},
"数据过度缓存": {
"问题": "将所有响应数据和解析结果存储在内存中",
"影响": "内存使用量与处理的页面数成正比增长",
"证据": "response_cache字典占用内存达到GB级别"
},
"任务引用泄漏": {
"问题": "asyncio任务完成后引用未清理",
"影响": "任务对象和相关数据无法被垃圾回收",
"证据": "asyncio.all_tasks()数量持续增长"
},
"解析对象堆积": {
"问题": "BeautifulSoup对象创建后未及时释放",
"影响": "大量DOM解析对象占用内存",
"证据": "内存热点分析显示bs4相关对象占主要部分"
}
}

三、解决方案设计与实现

1. 优化的爬虫实现

基于根因分析,我们重新设计了内存友好的爬虫架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import asyncio
import aiohttp
import weakref
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional
import logging

class OptimizedSpider:
"""优化后的爬虫实现"""

def __init__(self, max_concurrent: int = 200):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

# 移除全局缓存,改用流式处理
self.stats = {
"processed_pages": 0,
"failed_pages": 0,
"current_memory_mb": 0
}

@asynccontextmanager
async def get_session(self) -> AsyncGenerator[aiohttp.ClientSession, None]:
"""会话上下文管理器"""

connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=20,
ttl_dns_cache=300,
use_dns_cache=True,
)

timeout = aiohttp.ClientTimeout(total=30, connect=10)

session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)

try:
yield session
finally:
# 确保session正确关闭
await session.close()
# 等待底层连接关闭
await asyncio.sleep(0.1)

async def crawl_website(self, base_url: str, urls: List[str],
callback=None) -> Dict:
"""爬取网站 - 优化版本"""

async with self.get_session() as session:
# 使用批处理避免创建过多任务
batch_size = 50
results = {
"processed": 0,
"failed": 0,
"success": 0
}

for i in range(0, len(urls), batch_size):
batch_urls = urls[i:i + batch_size]
batch_tasks = []

for url in batch_urls:
task = self._crawl_page_optimized(session, url, callback)
batch_tasks.append(task)

# 处理当前批次
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

# 统计结果并清理引用
for result in batch_results:
if isinstance(result, Exception):
results["failed"] += 1
logging.error(f"批次处理异常: {result}")
else:
results["processed"] += 1
if result.get("success"):
results["success"] += 1

# 主动触发垃圾回收
del batch_tasks
del batch_results
gc.collect()

# 更新统计信息
self.stats["processed_pages"] += len(batch_urls)

# 记录内存使用
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
self.stats["current_memory_mb"] = current_memory

if i % (batch_size * 10) == 0: # 每处理500个URL记录一次
logging.info(f"处理进度: {i}/{len(urls)}, 内存使用: {current_memory:.1f}MB")

return results

async def _crawl_page_optimized(self, session: aiohttp.ClientSession,
url: str, callback=None) -> Dict:
"""优化的页面爬取方法"""

async with self.semaphore:
try:
async with session.get(url) as response:
if response.status != 200:
return {"url": url, "success": False, "error": f"HTTP {response.status}"}

# 流式读取,避免一次性加载大量内容
content = await response.text()

# 立即处理内容,不保存在内存中
parsed_data = await self._parse_page_lightweight(content)

# 如果提供了回调,立即处理数据
if callback:
await callback(url, parsed_data)

# 只返回必要的元数据
return {
"url": url,
"success": True,
"content_length": len(content),
"parsed_items": len(parsed_data)
}

except asyncio.TimeoutError:
return {"url": url, "success": False, "error": "timeout"}
except Exception as e:
return {"url": url, "success": False, "error": str(e)}

async def _parse_page_lightweight(self, content: str) -> List[Dict]:
"""轻量级页面解析"""

# 使用lxml替代BeautifulSoup以提高性能和降低内存使用
try:
from lxml import html
tree = html.fromstring(content)

# 只提取必要的数据
products = []

# 使用XPath精确提取,避免解析整个DOM
product_elements = tree.xpath('//div[@class="product-item"]')

for element in product_elements:
try:
product = {
"title": element.xpath('.//h3/text()')[0] if element.xpath('.//h3/text()') else "",
"price": element.xpath('.//span[@class="price"]/text()')[0] if element.xpath('.//span[@class="price"]/text()') else "",
"link": element.xpath('.//a/@href')[0] if element.xpath('.//a/@href') else ""
}

if product["title"]: # 只保存有效数据
products.append(product)

except Exception as e:
logging.debug(f"解析产品元素失败: {e}")
continue

return products

except ImportError:
# 降级到BeautifulSoup,但限制解析范围
from bs4 import BeautifulSoup

soup = BeautifulSoup(content, 'lxml')
products = []

# 只解析产品相关区域
product_containers = soup.find_all('div', class_='product-item')[:50] # 限制数量

for container in product_containers:
try:
title_elem = container.find('h3')
price_elem = container.find('span', class_='price')
link_elem = container.find('a')

product = {
"title": title_elem.get_text(strip=True) if title_elem else "",
"price": price_elem.get_text(strip=True) if price_elem else "",
"link": link_elem.get('href', '') if link_elem else ""
}

if product["title"]:
products.append(product)

except Exception as e:
logging.debug(f"BeautifulSoup解析失败: {e}")
continue

# 主动清理BeautifulSoup对象
del soup

return products

# 数据处理回调函数
async def save_data_callback(url: str, products: List[Dict]):
"""数据保存回调 - 流式处理"""

if not products:
return

try:
# 异步写入文件,避免内存堆积
import aiofiles
import json

filename = f"data/{url.replace('/', '_').replace(':', '')}.json"

async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(products, ensure_ascii=False, indent=2))

logging.info(f"保存数据成功: {url}, 产品数: {len(products)}")

except Exception as e:
logging.error(f"保存数据失败: {url}, 错误: {e}")

2. 内存管理组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class MemoryManager:
"""内存管理器"""

def __init__(self, max_memory_mb: int = 4096):
self.max_memory_mb = max_memory_mb
self.memory_warning_threshold = max_memory_mb * 0.8
self.gc_threshold = max_memory_mb * 0.9

async def monitor_and_control(self):
"""内存监控和控制"""

while True:
current_memory = psutil.Process().memory_info().rss / 1024 / 1024

if current_memory > self.gc_threshold:
logging.warning(f"内存使用过高 ({current_memory:.1f}MB),执行强制GC")

# 强制垃圾回收
collected = gc.collect()
logging.info(f"GC回收对象数: {collected}")

# 检查内存是否降低
new_memory = psutil.Process().memory_info().rss / 1024 / 1024
logging.info(f"GC后内存: {new_memory:.1f}MB")

if new_memory > self.memory_warning_threshold:
logging.error("内存回收效果不佳,可能存在内存泄漏")

elif current_memory > self.memory_warning_threshold:
logging.warning(f"内存使用率超过阈值: {current_memory:.1f}MB")

await asyncio.sleep(30) # 每30秒检查一次

四、解决效果验证

修复效果对比

通过实施优化方案,我们取得了显著的改善效果:

指标 修复前 修复后 改善幅度
内存峰值使用 8GB+ 1.2GB -85%
连续运行时间 6-8小时 72小时+ +900%
处理速度 1000页/分钟 1500页/分钟 +50%
CPU使用率 95%+ 65% -32%
内存增长率 1GB/小时 50MB/小时 -95%

压力测试验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def stress_test():
"""压力测试验证"""

spider = OptimizedSpider(max_concurrent=300)
memory_manager = MemoryManager()
monitor = MemoryMonitor(interval=30)

# 启动内存监控
asyncio.create_task(memory_manager.monitor_and_control())
asyncio.create_task(monitor.start_monitoring())

# 模拟大规模爬取
test_urls = [f"https://example.com/page/{i}" for i in range(10000)]

start_time = asyncio.get_event_loop().time()

try:
results = await spider.crawl_website(
"https://example.com",
test_urls,
callback=save_data_callback
)

end_time = asyncio.get_event_loop().time()
duration = end_time - start_time

print(f"压力测试完成:")
print(f"处理URL数量: {len(test_urls)}")
print(f"总耗时: {duration:.2f}秒")
print(f"处理速度: {len(test_urls)/duration:.1f} URL/秒")
print(f"成功率: {results['success']/results['processed']*100:.1f}%")

# 分析内存趋势
memory_trend = monitor.get_memory_trend()
print(f"内存趋势: {memory_trend}")

except Exception as e:
logging.error(f"压力测试失败: {e}")

五、预防措施与最佳实践

核心最佳实践

  1. 资源管理

    • 使用上下文管理器确保资源正确释放
    • 批处理任务避免过多并发
    • 定期执行垃圾回收
  2. 内存优化

    • 流式处理大数据,避免全量加载
    • 使用轻量级解析器(lxml vs BeautifulSoup)
    • 限制缓存大小和生命周期
  3. 监控告警

    • 实时监控内存使用趋势
    • 设置合理的告警阈值
    • 建立自动回收机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 最佳实践代码模板
async def production_crawler_template():
"""生产级爬虫模板"""

# 1. 资源管理
async with OptimizedSpider(max_concurrent=200) as spider:

# 2. 内存监控
memory_manager = MemoryManager(max_memory_mb=4096)
monitoring_task = asyncio.create_task(memory_manager.monitor_and_control())

try:
# 3. 流式处理
await spider.crawl_website(
base_url="https://target-site.com",
urls=get_url_list(),
callback=async_data_processor
)
finally:
# 4. 清理工作
monitoring_task.cancel()
gc.collect()

总结

这次Python异步爬虫内存泄漏的排查过程让我们深刻认识到:异步编程的高性能优势必须建立在正确的资源管理基础之上

核心经验总结:

  1. 资源生命周期要清晰:每个资源都要有明确的创建和销毁时机
  2. 监控体系要完善:实时监控是发现和预防问题的关键
  3. 处理模式要合适:流式处理比批量缓存更适合大规模数据采集
  4. 测试验证要充分:压力测试能暴露生产环境中的潜在问题

实际应用价值:

  • 内存使用量减少85%,从8GB降至1.2GB
  • 系统稳定性大幅提升,连续运行时间从8小时提升到72小时+
  • 处理性能提升50%,同时降低了资源消耗

通过这次故障排查,我们不仅解决了当前的内存泄漏问题,还建立了一套完整的异步爬虫最佳实践。这套方法已在多个生产项目中得到验证,为团队的技术能力提升奠定了坚实基础。