RPA 生产环境大规模并发执行故障排查实战:从系统崩溃到稳定运行的完整方案

RPA 生产环境大规模并发执行故障排查实战:从系统崩溃到稳定运行的完整方案

技术主题:RPA 技术(机器人流程自动化)
内容方向:生产环境事故的解决过程(故障现象、根因分析、解决方案、预防措施)

引言

在RPA项目投入生产环境后,大规模并发执行往往是对系统稳定性的终极考验。我们团队在某大型金融机构的账单处理项目中遭遇了严重的并发执行故障:原本设计处理100个并发任务的系统,在达到80个并发时就开始频繁崩溃,导致大量业务数据处理失败。经过72小时的紧急排查,我们发现了隐藏在并发执行背后的多个技术陷阱。本文将完整记录这次故障处理过程,分享RPA并发执行的核心技术要点。

一、故障现象与业务影响

故障现象描述

2024年4月5日凌晨2点,我们的RPA账单处理系统出现了严重异常:

1
2
3
4
5
6
7
# 典型错误日志
"""
2024-04-05 02:15:32 ERROR - 影刀执行器异常: 资源访问冲突
2024-04-05 02:15:45 ERROR - Chrome浏览器进程创建失败: 端口占用
2024-04-05 02:16:12 WARN - 数据库连接池耗尽: 等待连接超时
2024-04-05 02:16:28 ERROR - 文件锁获取失败: 资源被占用
"""

关键现象:

  • 并发度超过80时系统开始不稳定
  • Chrome浏览器进程大量残留,占用端口资源
  • 数据文件出现读写冲突,导致数据损坏
  • 系统CPU使用率飙升至95%+,内存占用超过阈值

业务影响评估

1
2
3
4
5
6
7
8
# 故障影响统计
FAILURE_IMPACT = {
"affected_accounts": 15672, # 受影响账单数量
"processing_delay": "8小时", # 处理延迟
"data_corruption": 234, # 数据损坏条数
"system_downtime": "45分钟", # 系统停机时间
"business_loss": "约50万元" # 业务损失估算
}

二、故障排查过程

1. 系统资源分析

首先通过监控系统分析资源使用情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import psutil
import subprocess
import time
from typing import Dict, List

class RPAResourceMonitor:
"""RPA资源监控器"""

def __init__(self):
self.process_history = []
self.resource_alerts = []

def scan_rpa_processes(self) -> Dict:
"""扫描RPA相关进程"""
rpa_processes = {
"chrome_instances": [],
"yingdao_robots": [],
"python_scripts": []
}

for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info']):
try:
proc_info = proc.info
if 'chrome' in proc_info['name'].lower():
rpa_processes["chrome_instances"].append({
"pid": proc_info['pid'],
"cpu": proc_info['cpu_percent'],
"memory_mb": proc_info['memory_info'].rss / 1024 / 1024
})
elif 'yingdao' in proc_info['name'].lower():
rpa_processes["yingdao_robots"].append(proc_info)
elif 'python' in proc_info['name'].lower():
rpa_processes["python_scripts"].append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue

return rpa_processes

def check_port_conflicts(self) -> List[int]:
"""检查端口冲突"""
occupied_ports = []

# 检查Chrome调试端口范围 9222-9322
for port in range(9222, 9323):
try:
result = subprocess.run(
['lsof', '-i', f':{port}'],
capture_output=True, text=True
)
if result.returncode == 0:
occupied_ports.append(port)
except:
pass

return occupied_ports

def analyze_file_locks(self, data_dir: str) -> Dict:
"""分析文件锁状况"""
lock_info = {
"locked_files": [],
"concurrent_access": []
}

try:
result = subprocess.run(
['lsof', '+D', data_dir],
capture_output=True, text=True
)

if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines[1:]: # 跳过标题行
if line.strip():
parts = line.split()
if len(parts) >= 2:
lock_info["locked_files"].append({
"process": parts[0],
"pid": parts[1],
"file": parts[-1] if len(parts) > 8 else "unknown"
})
except:
pass

return lock_info

2. 发现关键问题

通过深入分析,我们发现了几个致命问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 问题代码 - 导致并发冲突的原始实现
class ProblematicRPAController:
"""有问题的RPA控制器"""

def __init__(self):
# 问题1: 固定端口配置,导致端口冲突
self.chrome_debug_port = 9222

# 问题2: 共享数据目录,无并发保护
self.data_dir = "/shared/data"

# 问题3: 单一数据库连接池,无并发控制
self.db_pool_size = 5

def start_robot_instance(self, task_id: str):
"""启动机器人实例 - 问题版本"""

# 问题4: 所有实例使用相同端口
chrome_options = f"--remote-debugging-port={self.chrome_debug_port}"

# 问题5: 无端口冲突检测
browser = self.launch_chrome(chrome_options)

# 问题6: 直接访问共享文件,无锁保护
data_file = f"{self.data_dir}/task_data.xlsx"
with open(data_file, 'r') as f:
task_data = f.read()

return self.execute_task(task_id, browser, task_data)

def execute_batch_tasks(self, task_list: List[str]):
"""批量执行任务 - 问题版本"""

# 问题7: 无并发控制,直接并行启动
for task_id in task_list:
# 每个任务都创建新线程,无限制
thread = threading.Thread(
target=self.start_robot_instance,
args=(task_id,)
)
thread.start() # 无等待,无控制

3. 根因分析

通过分析确定了故障的根本原因:

  1. 端口资源冲突:所有Chrome实例使用固定端口,导致启动失败
  2. 文件访问竞争:多个进程同时读写共享文件,造成数据损坏
  3. 数据库连接耗尽:无连接池管理,并发连接超过数据库限制
  4. 无并发度控制:系统无法限制同时执行的任务数量
  5. 资源清理不当:异常时进程和文件锁未正确释放

三、解决方案设计与实现

1. 智能端口管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import threading
import socket
from contextlib import contextmanager

class PortManager:
"""智能端口管理器"""

def __init__(self, port_range_start: int = 9222, port_range_size: int = 100):
self.port_range_start = port_range_start
self.port_range_end = port_range_start + port_range_size
self.used_ports = set()
self.port_lock = threading.Lock()

@contextmanager
def allocate_port(self):
"""分配可用端口"""
port = None
try:
port = self._find_available_port()
if port:
with self.port_lock:
self.used_ports.add(port)
yield port
else:
raise Exception("无可用端口")
finally:
if port:
with self.port_lock:
self.used_ports.discard(port)

def _find_available_port(self) -> int:
"""查找可用端口"""
for port in range(self.port_range_start, self.port_range_end):
if port not in self.used_ports and self._is_port_available(port):
return port
return None

def _is_port_available(self, port: int) -> bool:
"""检查端口是否可用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.bind(('localhost', port))
return True
except OSError:
return False

# 使用示例
port_manager = PortManager()

with port_manager.allocate_port() as debug_port:
chrome_options = f"--remote-debugging-port={debug_port}"
print(f"使用端口: {debug_port}")

2. 文件并发访问控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import fcntl
import os
import json
from pathlib import Path

class SafeFileManager:
"""安全文件管理器"""

def __init__(self, base_dir: str):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
self.file_locks = {}

@contextmanager
def safe_file_access(self, filename: str, mode: str = 'r'):
"""安全文件访问"""
filepath = self.base_dir / filename
lock_file = filepath.with_suffix(filepath.suffix + '.lock')

try:
# 创建锁文件
lock_fd = os.open(str(lock_file), os.O_CREAT | os.O_WRONLY)

# 获取文件锁
fcntl.flock(lock_fd, fcntl.LOCK_EX)

# 打开目标文件
with open(filepath, mode, encoding='utf-8') as file:
yield file

finally:
# 释放锁并清理
try:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)
if lock_file.exists():
lock_file.unlink()
except:
pass

def safe_read_task_data(self, task_id: str) -> dict:
"""安全读取任务数据"""
filename = f"task_{task_id}.json"

with self.safe_file_access(filename, 'r') as f:
return json.load(f)

def safe_write_result(self, task_id: str, result_data: dict):
"""安全写入结果数据"""
filename = f"result_{task_id}.json"

with self.safe_file_access(filename, 'w') as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)

3. 优化的RPA并发控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional

class OptimizedRPAController:
"""优化的RPA并发控制器"""

def __init__(self, max_concurrent: int = 50):
self.max_concurrent = max_concurrent
self.port_manager = PortManager()
self.file_manager = SafeFileManager("/rpa/data")

# 线程池控制并发度
self.executor = ThreadPoolExecutor(max_workers=max_concurrent)

# 活跃任务跟踪
self.active_tasks = {}
self.task_lock = threading.Lock()

# 资源使用统计
self.stats = {
"total_executed": 0,
"current_running": 0,
"failed_tasks": 0,
"avg_execution_time": 0
}

def execute_task_safely(self, task_id: str) -> Dict:
"""安全执行单个任务"""
start_time = time.time()

try:
with self.task_lock:
self.active_tasks[task_id] = {
"start_time": start_time,
"status": "running"
}
self.stats["current_running"] += 1

# 分配独立端口
with self.port_manager.allocate_port() as debug_port:

# 安全读取任务数据
task_data = self.file_manager.safe_read_task_data(task_id)

# 启动Chrome实例
browser = self._launch_chrome_with_port(debug_port)

try:
# 执行实际业务逻辑
result = self._execute_business_logic(task_data, browser)

# 安全保存结果
self.file_manager.safe_write_result(task_id, result)

return {"status": "success", "task_id": task_id, "result": result}

finally:
# 确保浏览器进程清理
self._cleanup_chrome_process(browser)

except Exception as e:
self.stats["failed_tasks"] += 1
return {"status": "failed", "task_id": task_id, "error": str(e)}

finally:
# 更新统计信息
execution_time = time.time() - start_time
with self.task_lock:
if task_id in self.active_tasks:
del self.active_tasks[task_id]
self.stats["current_running"] -= 1
self.stats["total_executed"] += 1

# 更新平均执行时间
current_avg = self.stats["avg_execution_time"]
total_count = self.stats["total_executed"]
self.stats["avg_execution_time"] = (
(current_avg * (total_count - 1) + execution_time) / total_count
)

def execute_batch_tasks(self, task_list: List[str]) -> List[Dict]:
"""批量执行任务 - 优化版本"""

print(f"🚀 开始批量执行 {len(task_list)} 个任务,最大并发: {self.max_concurrent}")

# 提交所有任务到线程池
future_to_task = {
self.executor.submit(self.execute_task_safely, task_id): task_id
for task_id in task_list
}

results = []
completed_count = 0

# 收集结果
for future in as_completed(future_to_task):
task_id = future_to_task[future]

try:
result = future.result()
results.append(result)
completed_count += 1

# 进度报告
if completed_count % 10 == 0:
print(f"📊 已完成: {completed_count}/{len(task_list)}")

except Exception as e:
print(f"❌ 任务 {task_id} 执行异常: {e}")
results.append({
"status": "error",
"task_id": task_id,
"error": str(e)
})

return results

def _launch_chrome_with_port(self, debug_port: int):
"""使用指定端口启动Chrome"""
chrome_options = [
f"--remote-debugging-port={debug_port}",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
f"--user-data-dir=/tmp/chrome_profile_{debug_port}"
]

# 这里使用影刀或UIBot的浏览器启动API
# 示例使用selenium
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

options = Options()
for opt in chrome_options:
options.add_argument(opt)

return webdriver.Chrome(options=options)

def _cleanup_chrome_process(self, browser):
"""清理Chrome进程"""
try:
if browser:
browser.quit()
except:
pass

# 强制清理残留进程
try:
browser_pid = browser.service.process.pid
psutil.Process(browser_pid).terminate()
except:
pass

def get_system_status(self) -> Dict:
"""获取系统状态"""
return {
**self.stats,
"active_tasks": list(self.active_tasks.keys()),
"available_ports": self.port_manager.port_range_end - len(self.port_manager.used_ports),
"system_load": psutil.cpu_percent()
}

四、解决效果验证

修复效果对比

指标 修复前 修复后 改善幅度
最大并发数 80个任务 150个任务 +87%
系统稳定性 经常崩溃 连续运行72小时 质的提升
资源利用率 CPU 95%+ CPU 65% -32%
任务成功率 78% 99.2% +27%
数据损坏率 1.5% 0.01% -99%

性能优化效果

  1. 并发度提升:通过智能资源管理,并发处理能力提升87%
  2. 稳定性增强:系统可连续稳定运行,无异常崩溃
  3. 资源效率:CPU使用率下降32%,资源利用更合理
  4. 成功率提升:任务执行成功率从78%提升至99.2%

五、预防措施与最佳实践

1. 监控告警机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class RPAHealthMonitor:
"""RPA健康监控器"""

def __init__(self, controller: OptimizedRPAController):
self.controller = controller
self.alert_thresholds = {
"cpu_usage": 80,
"memory_usage": 75,
"failed_rate": 5,
"concurrent_tasks": 100
}

def continuous_monitoring(self):
"""持续监控"""
while True:
try:
status = self.controller.get_system_status()
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent

# 检查告警条件
if cpu_usage > self.alert_thresholds["cpu_usage"]:
self._send_alert(f"CPU使用率过高: {cpu_usage}%")

if len(status["active_tasks"]) > self.alert_thresholds["concurrent_tasks"]:
self._send_alert(f"并发任务数过多: {len(status['active_tasks'])}")

time.sleep(60)

except Exception as e:
print(f"监控异常: {e}")
time.sleep(30)

def _send_alert(self, message: str):
"""发送告警"""
print(f"🚨 RPA系统告警: {message}")
# 这里可以集成企业微信、钉钉等告警渠道

2. 最佳实践总结

资源管理最佳实践:

  • 使用端口池管理,避免端口冲突
  • 实现文件锁机制,防止并发读写冲突
  • 控制并发度,避免系统资源耗尽
  • 完善进程清理,防止资源泄漏

监控运维最佳实践:

  • 建立完善的监控告警体系
  • 定期进行压力测试验证
  • 制定应急处理预案
  • 建立性能基线和优化目标

总结

这次RPA生产环境并发故障的排查过程让我们深刻认识到:RPA系统的稳定性不仅取决于业务逻辑的正确性,更取决于底层资源管理的精细化程度

核心经验总结:

  1. 资源隔离是关键:不同任务实例必须使用独立的资源(端口、文件、数据库连接)
  2. 并发控制是必须:无限制的并发会导致系统崩溃,合理的并发度控制是稳定性保障
  3. 异常处理要完善:每个资源分配都要有对应的清理逻辑
  4. 监控告警不可少:实时监控系统状态,及时发现并处理异常

通过这次故障处理,我们不仅解决了当前的并发问题,还建立了一套完整的RPA并发执行管理体系。这套解决方案已在生产环境稳定运行,成功支撑了更大规模的业务处理需求,为RPA项目的规模化应用奠定了坚实基础。