RPA 生产环境故障排查实录:从机器人异常到业务恢复的完整解决方案

RPA 生产环境故障排查实录:从机器人异常到业务恢复的完整解决方案

引言

在RPA(机器人流程自动化)项目的生产环境运行中,机器人故障往往会直接影响业务流程的正常运转。当财务机器人突然停止工作、订单处理机器人频繁报错、或者数据同步机器人出现异常时,如何快速定位问题根因并恢复业务运行,是每个RPA运维团队必须面对的挑战。本文将通过一个真实的生产环境故障案例,详细记录从故障发现到完全解决的全过程,包括故障现象分析、排查思路、根因定位、解决方案实施以及预防措施建立,为RPA项目的稳定运行提供实用的故障处理指南。

一、故障现象与影响评估

1.1 故障背景

某大型制造企业的财务自动化RPA系统在周一早晨出现大规模故障:

  • 业务场景:自动化处理供应商发票录入、审核和付款流程
  • 机器人数量:15个并发机器人,处理3个不同的业务流程
  • 日处理量:约2000张发票,涉及金额超过500万元
  • 运行环境:UiPath Orchestrator + Windows Server 2019

1.2 故障现象描述

主要故障表现:

  1. 机器人状态异常:15个机器人中有12个显示为”Faulted”状态
  2. 业务流程中断:发票处理流程完全停止,积压订单快速增长
  3. 系统响应缓慢:Orchestrator界面加载时间从2秒增加到30秒以上
  4. 错误日志激增:每分钟产生200+条错误日志
  5. 用户投诉:财务部门无法正常处理当日发票,影响供应商付款

1.3 业务影响评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 故障影响评估脚本
import datetime
from dataclasses import dataclass
from typing import List

@dataclass
class BusinessImpact:
"""业务影响评估数据结构"""
process_name: str
daily_volume: int
avg_processing_time: int # 分钟
business_value: float # 万元
downtime_cost_per_hour: float # 万元/小时

class IncidentImpactCalculator:
"""故障影响计算器"""

def __init__(self):
self.processes = [
BusinessImpact("发票录入流程", 800, 5, 200, 15),
BusinessImpact("发票审核流程", 600, 8, 150, 12),
BusinessImpact("付款处理流程", 400, 12, 300, 25)
]

def calculate_hourly_impact(self) -> dict:
"""计算每小时业务影响"""
total_impact = {
"delayed_transactions": 0,
"financial_impact": 0,
"processing_backlog": 0
}

for process in self.processes:
hourly_volume = process.daily_volume / 8 # 8小时工作制
total_impact["delayed_transactions"] += hourly_volume
total_impact["financial_impact"] += process.downtime_cost_per_hour
total_impact["processing_backlog"] += hourly_volume * process.avg_processing_time

return total_impact

def generate_impact_report(self, downtime_hours: float) -> str:
"""生成影响报告"""
hourly_impact = self.calculate_hourly_impact()

report = f"""
=== RPA故障业务影响报告 ===
故障时长: {downtime_hours:.1f}小时
延迟交易数: {hourly_impact['delayed_transactions'] * downtime_hours:.0f}
财务影响: {hourly_impact['financial_impact'] * downtime_hours:.1f}万元
积压工作量: {hourly_impact['processing_backlog'] * downtime_hours:.0f}分钟
预计恢复时间: {hourly_impact['processing_backlog'] * downtime_hours / 60:.1f}小时
"""

return report

# 使用示例
if __name__ == "__main__":
calculator = IncidentImpactCalculator()
# 假设故障持续2.5小时
impact_report = calculator.generate_impact_report(2.5)
print(impact_report)

故障影响评估结果:

  • 延迟交易:约4500笔发票处理延迟
  • 财务影响:预计损失130万元(包括延迟付款罚金、人工处理成本)
  • 人力成本:需要20名财务人员加班处理积压业务
  • 声誉影响:供应商满意度下降,合作关系受损

二、故障排查与根因分析

2.1 初步排查流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# RPA故障排查工具
import psutil
import requests
import json
from datetime import datetime, timedelta
import logging

class RPAHealthChecker:
"""RPA系统健康检查工具"""

def __init__(self, orchestrator_url: str, api_key: str):
self.orchestrator_url = orchestrator_url
self.api_key = api_key
self.logger = self._setup_logger()

def _setup_logger(self):
"""设置日志记录器"""
logger = logging.getLogger('RPAHealthChecker')
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger

def check_system_resources(self) -> dict:
"""检查系统资源使用情况"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')

resources = {
"cpu_usage": cpu_percent,
"memory_usage": memory.percent,
"memory_available_gb": memory.available / (1024**3),
"disk_usage": disk.percent,
"disk_free_gb": disk.free / (1024**3)
}

self.logger.info(f"系统资源检查完成: {resources}")
return resources

except Exception as e:
self.logger.error(f"系统资源检查失败: {e}")
return {}

def check_orchestrator_status(self) -> dict:
"""检查Orchestrator服务状态"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

# 检查API响应时间
start_time = datetime.now()
response = requests.get(
f"{self.orchestrator_url}/odata/Robots",
headers=headers,
timeout=30
)
response_time = (datetime.now() - start_time).total_seconds()

status = {
"api_status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time_seconds": response_time,
"status_code": response.status_code
}

if response.status_code == 200:
robots_data = response.json()
status["total_robots"] = len(robots_data.get('value', []))
status["connected_robots"] = len([
r for r in robots_data.get('value', [])
if r.get('State') == 'Available'
])

self.logger.info(f"Orchestrator状态检查完成: {status}")
return status

except Exception as e:
self.logger.error(f"Orchestrator状态检查失败: {e}")
return {"api_status": "error", "error": str(e)}

def check_robot_status(self) -> list:
"""检查机器人状态"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

response = requests.get(
f"{self.orchestrator_url}/odata/Robots",
headers=headers,
timeout=30
)

if response.status_code != 200:
return []

robots_data = response.json()
robot_status = []

for robot in robots_data.get('value', []):
status = {
"name": robot.get('Name'),
"state": robot.get('State'),
"last_heartbeat": robot.get('LastReportTime'),
"version": robot.get('Version'),
"machine_name": robot.get('MachineName')
}
robot_status.append(status)

self.logger.info(f"机器人状态检查完成,共{len(robot_status)}个机器人")
return robot_status

except Exception as e:
self.logger.error(f"机器人状态检查失败: {e}")
return []

def check_recent_jobs(self, hours: int = 2) -> list:
"""检查最近的作业执行情况"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

# 计算时间范围
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)

# 查询最近的作业
filter_query = f"CreationTime ge {start_time.isoformat()}Z"
response = requests.get(
f"{self.orchestrator_url}/odata/Jobs",
headers=headers,
params={"$filter": filter_query, "$orderby": "CreationTime desc"},
timeout=30
)

if response.status_code != 200:
return []

jobs_data = response.json()
job_summary = []

for job in jobs_data.get('value', []):
summary = {
"id": job.get('Id'),
"process_name": job.get('ProcessName'),
"state": job.get('State'),
"creation_time": job.get('CreationTime'),
"end_time": job.get('EndTime'),
"robot_name": job.get('RobotName'),
"exception_reason": job.get('Info')
}
job_summary.append(summary)

self.logger.info(f"作业检查完成,最近{hours}小时内共{len(job_summary)}个作业")
return job_summary

except Exception as e:
self.logger.error(f"作业检查失败: {e}")
return []

def generate_health_report(self) -> dict:
"""生成综合健康报告"""
self.logger.info("开始生成RPA系统健康报告...")

report = {
"timestamp": datetime.now().isoformat(),
"system_resources": self.check_system_resources(),
"orchestrator_status": self.check_orchestrator_status(),
"robot_status": self.check_robot_status(),
"recent_jobs": self.check_recent_jobs(2)
}

# 分析故障模式
report["analysis"] = self._analyze_issues(report)

self.logger.info("RPA系统健康报告生成完成")
return report

def _analyze_issues(self, report: dict) -> dict:
"""分析潜在问题"""
issues = []
recommendations = []

# 分析系统资源
resources = report.get("system_resources", {})
if resources.get("cpu_usage", 0) > 80:
issues.append("CPU使用率过高")
recommendations.append("检查是否有异常进程占用CPU")

if resources.get("memory_usage", 0) > 85:
issues.append("内存使用率过高")
recommendations.append("检查内存泄漏或增加系统内存")

# 分析机器人状态
robots = report.get("robot_status", [])
faulted_robots = [r for r in robots if r.get("state") == "Faulted"]
if len(faulted_robots) > len(robots) * 0.3: # 超过30%的机器人故障
issues.append(f"大量机器人故障({len(faulted_robots)}/{len(robots)})")
recommendations.append("检查Orchestrator连接和机器人配置")

# 分析作业执行情况
jobs = report.get("recent_jobs", [])
failed_jobs = [j for j in jobs if j.get("state") == "Faulted"]
if len(failed_jobs) > len(jobs) * 0.2: # 超过20%的作业失败
issues.append(f"作业失败率过高({len(failed_jobs)}/{len(jobs)})")
recommendations.append("检查业务流程和目标应用系统")

return {
"issues_found": issues,
"recommendations": recommendations,
"severity": "high" if len(issues) >= 3 else "medium" if len(issues) >= 1 else "low"
}

# 使用示例
if __name__ == "__main__":
checker = RPAHealthChecker(
orchestrator_url="https://your-orchestrator.com",
api_key="your-api-key"
)

health_report = checker.generate_health_report()
print(json.dumps(health_report, indent=2, ensure_ascii=False))

2.2 深度分析发现的问题

通过系统化的排查,我们发现了以下关键问题:

1. 数据库连接池耗尽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- 检查数据库连接情况
SELECT
DB_NAME(dbid) as DatabaseName,
COUNT(dbid) as NumberOfConnections,
loginame as LoginName
FROM sys.sysprocesses
WHERE dbid > 0
GROUP BY dbid, loginame
ORDER BY NumberOfConnections DESC;

-- 检查长时间运行的查询
SELECT
session_id,
start_time,
status,
command,
DATEDIFF(SECOND, start_time, GETDATE()) as duration_seconds,
blocking_session_id,
wait_type,
wait_resource
FROM sys.dm_exec_requests
WHERE session_id > 50
AND DATEDIFF(SECOND, start_time, GETDATE()) > 30
ORDER BY duration_seconds DESC;

2. 目标应用系统响应超时

  • ERP系统在早晨8点进行了版本更新
  • 新版本的界面元素发生了变化
  • 机器人的选择器失效,导致元素识别失败

3. 网络连接不稳定

  • 机器人与Orchestrator之间的心跳包丢失
  • 部分机器人显示为离线状态

三、解决方案实施

3.1 紧急恢复措施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# 紧急恢复脚本
import time
import requests
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

class EmergencyRecoveryManager:
"""紧急恢复管理器"""

def __init__(self, orchestrator_url: str, api_key: str):
self.orchestrator_url = orchestrator_url
self.api_key = api_key
self.logger = self._setup_logger()
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}

def _setup_logger(self):
logger = logging.getLogger('EmergencyRecovery')
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger

def restart_faulted_robots(self) -> dict:
"""重启故障机器人"""
self.logger.info("开始重启故障机器人...")

# 获取故障机器人列表
faulted_robots = self._get_faulted_robots()

if not faulted_robots:
self.logger.info("没有发现故障机器人")
return {"success": True, "restarted_count": 0}

# 并行重启机器人
restart_results = []
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_robot = {
executor.submit(self._restart_robot, robot): robot
for robot in faulted_robots
}

for future in as_completed(future_to_robot):
robot = future_to_robot[future]
try:
result = future.result()
restart_results.append({
"robot_name": robot["Name"],
"success": result,
"timestamp": time.time()
})
except Exception as e:
self.logger.error(f"重启机器人{robot['Name']}失败: {e}")
restart_results.append({
"robot_name": robot["Name"],
"success": False,
"error": str(e)
})

successful_restarts = len([r for r in restart_results if r["success"]])
self.logger.info(f"机器人重启完成: {successful_restarts}/{len(faulted_robots)}")

return {
"success": True,
"restarted_count": successful_restarts,
"total_faulted": len(faulted_robots),
"details": restart_results
}

def _get_faulted_robots(self) -> list:
"""获取故障机器人列表"""
try:
response = requests.get(
f"{self.orchestrator_url}/odata/Robots",
headers=self.headers,
timeout=30
)

if response.status_code != 200:
return []

robots_data = response.json()
faulted_robots = [
robot for robot in robots_data.get('value', [])
if robot.get('State') in ['Faulted', 'Disconnected']
]

return faulted_robots

except Exception as e:
self.logger.error(f"获取机器人列表失败: {e}")
return []

def _restart_robot(self, robot: dict) -> bool:
"""重启单个机器人"""
try:
robot_id = robot.get('Id')
robot_name = robot.get('Name')

self.logger.info(f"正在重启机器人: {robot_name}")

# 发送重启命令
restart_payload = {
"RobotId": robot_id,
"Action": "Restart"
}

response = requests.post(
f"{self.orchestrator_url}/odata/Robots/UiPath.Server.Configuration.OData.RestartRobot",
headers=self.headers,
json=restart_payload,
timeout=30
)

if response.status_code in [200, 204]:
self.logger.info(f"机器人{robot_name}重启命令发送成功")

# 等待机器人重新连接
return self._wait_for_robot_online(robot_id, timeout=60)
else:
self.logger.error(f"机器人{robot_name}重启失败: {response.status_code}")
return False

except Exception as e:
self.logger.error(f"重启机器人失败: {e}")
return False

def _wait_for_robot_online(self, robot_id: str, timeout: int = 60) -> bool:
"""等待机器人上线"""
start_time = time.time()

while time.time() - start_time < timeout:
try:
response = requests.get(
f"{self.orchestrator_url}/odata/Robots({robot_id})",
headers=self.headers,
timeout=10
)

if response.status_code == 200:
robot_data = response.json()
if robot_data.get('State') == 'Available':
return True

time.sleep(5) # 等待5秒后重试

except Exception:
time.sleep(5)

return False

def kill_stuck_jobs(self) -> dict:
"""终止卡住的作业"""
self.logger.info("开始终止卡住的作业...")

try:
# 查找运行时间超过30分钟的作业
response = requests.get(
f"{self.orchestrator_url}/odata/Jobs",
headers=self.headers,
params={
"$filter": "State eq 'Running'",
"$orderby": "CreationTime asc"
},
timeout=30
)

if response.status_code != 200:
return {"success": False, "error": "无法获取作业列表"}

jobs_data = response.json()
current_time = time.time()
stuck_jobs = []

for job in jobs_data.get('value', []):
creation_time = job.get('CreationTime')
if creation_time:
# 简化时间解析,实际应用中需要更严格的时间处理
job_duration = current_time - time.mktime(time.strptime(
creation_time[:19], '%Y-%m-%dT%H:%M:%S'
))

if job_duration > 1800: # 30分钟
stuck_jobs.append(job)

# 终止卡住的作业
killed_count = 0
for job in stuck_jobs:
if self._kill_job(job.get('Id')):
killed_count += 1

self.logger.info(f"作业终止完成: {killed_count}/{len(stuck_jobs)}")

return {
"success": True,
"killed_count": killed_count,
"total_stuck": len(stuck_jobs)
}

except Exception as e:
self.logger.error(f"终止作业失败: {e}")
return {"success": False, "error": str(e)}

def _kill_job(self, job_id: str) -> bool:
"""终止单个作业"""
try:
response = requests.post(
f"{self.orchestrator_url}/odata/Jobs({job_id})/UiPath.Server.Configuration.OData.StopJob",
headers=self.headers,
json={"strategy": "Kill"},
timeout=30
)

return response.status_code in [200, 204]

except Exception:
return False

def execute_emergency_recovery(self) -> dict:
"""执行紧急恢复流程"""
self.logger.info("=== 开始执行紧急恢复流程 ===")

recovery_results = {
"start_time": time.time(),
"steps": []
}

# 步骤1:终止卡住的作业
self.logger.info("步骤1: 终止卡住的作业")
kill_result = self.kill_stuck_jobs()
recovery_results["steps"].append({
"step": "kill_stuck_jobs",
"result": kill_result
})

# 步骤2:重启故障机器人
self.logger.info("步骤2: 重启故障机器人")
restart_result = self.restart_faulted_robots()
recovery_results["steps"].append({
"step": "restart_robots",
"result": restart_result
})

# 步骤3:等待系统稳定
self.logger.info("步骤3: 等待系统稳定")
time.sleep(30)

recovery_results["end_time"] = time.time()
recovery_results["duration_seconds"] = recovery_results["end_time"] - recovery_results["start_time"]

self.logger.info("=== 紧急恢复流程完成 ===")
return recovery_results

# 使用示例
if __name__ == "__main__":
recovery_manager = EmergencyRecoveryManager(
orchestrator_url="https://your-orchestrator.com",
api_key="your-api-key"
)

result = recovery_manager.execute_emergency_recovery()
print(f"恢复结果: {result}")

3.2 根本原因解决

1. 数据库连接池优化

1
2
3
4
5
6
<!-- Orchestrator数据库连接池配置优化 -->
<connectionStrings>
<add name="Default"
connectionString="Server=your-server;Database=UiPath;Integrated Security=true;Connection Timeout=30;Command Timeout=300;Max Pool Size=200;Min Pool Size=10;Pooling=true;"
providerName="System.Data.SqlClient" />
</connectionStrings>

2. 机器人选择器更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"selector_updates": {
"invoice_input_field": {
"old_selector": "<webctrl tag='INPUT' id='invoice_number' />",
"new_selector": "<webctrl tag='INPUT' aaname='Invoice Number*' class='form-control' />",
"backup_selector": "<webctrl tag='INPUT' placeholder='Enter invoice number' />"
},
"submit_button": {
"old_selector": "<webctrl tag='BUTTON' innertext='Submit' />",
"new_selector": "<webctrl tag='BUTTON' class='btn btn-primary' innertext='Submit Invoice' />",
"backup_selector": "<webctrl tag='BUTTON' type='submit' />"
}
}
}

3. 网络连接稳定性改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 机器人配置文件更新
robot_settings:
connection:
heartbeat_interval: 30 # 心跳间隔(秒)
connection_timeout: 60 # 连接超时(秒)
retry_attempts: 3 # 重试次数
retry_delay: 5 # 重试延迟(秒)

performance:
max_concurrent_activities: 5
activity_timeout: 300
screenshot_on_error: true

logging:
level: "Information"
max_log_size_mb: 100
log_retention_days: 7

四、预防措施与监控体系

4.1 智能监控系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# RPA智能监控系统
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Optional
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@dataclass
class AlertRule:
"""告警规则定义"""
name: str
metric: str
threshold: float
operator: str # '>', '<', '>=', '<=', '=='
severity: str # 'low', 'medium', 'high', 'critical'
cooldown_minutes: int = 15

class RPAMonitoringSystem:
"""RPA智能监控系统"""

def __init__(self, config: dict):
self.config = config
self.orchestrator_url = config['orchestrator_url']
self.api_key = config['api_key']
self.alert_rules = self._load_alert_rules()
self.alert_history = {}

def _load_alert_rules(self) -> List[AlertRule]:
"""加载告警规则"""
return [
AlertRule("机器人故障率过高", "robot_fault_rate", 0.2, ">", "high"),
AlertRule("作业失败率过高", "job_failure_rate", 0.15, ">", "medium"),
AlertRule("API响应时间过长", "api_response_time", 5.0, ">", "medium"),
AlertRule("系统CPU使用率过高", "cpu_usage", 80.0, ">", "high"),
AlertRule("系统内存使用率过高", "memory_usage", 85.0, ">", "high"),
AlertRule("数据库连接数过多", "db_connections", 150, ">", "critical"),
AlertRule("队列积压过多", "queue_backlog", 100, ">", "medium")
]

async def collect_metrics(self) -> Dict[str, float]:
"""收集系统指标"""
metrics = {}

async with aiohttp.ClientSession() as session:
# 并行收集各种指标
tasks = [
self._collect_robot_metrics(session),
self._collect_job_metrics(session),
self._collect_system_metrics(),
self._collect_queue_metrics(session)
]

results = await asyncio.gather(*tasks, return_exceptions=True)

# 合并结果
for result in results:
if isinstance(result, dict):
metrics.update(result)

return metrics

async def _collect_robot_metrics(self, session: aiohttp.ClientSession) -> Dict[str, float]:
"""收集机器人指标"""
try:
headers = {"Authorization": f"Bearer {self.api_key}"}

async with session.get(
f"{self.orchestrator_url}/odata/Robots",
headers=headers
) as response:
if response.status != 200:
return {}

data = await response.json()
robots = data.get('value', [])

total_robots = len(robots)
faulted_robots = len([r for r in robots if r.get('State') == 'Faulted'])

return {
"total_robots": total_robots,
"faulted_robots": faulted_robots,
"robot_fault_rate": faulted_robots / total_robots if total_robots > 0 else 0
}

except Exception:
return {}

async def _collect_job_metrics(self, session: aiohttp.ClientSession) -> Dict[str, float]:
"""收集作业指标"""
try:
headers = {"Authorization": f"Bearer {self.api_key}"}

# 查询最近1小时的作业
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
filter_query = f"CreationTime ge {start_time.isoformat()}Z"

async with session.get(
f"{self.orchestrator_url}/odata/Jobs",
headers=headers,
params={"$filter": filter_query}
) as response:
if response.status != 200:
return {}

data = await response.json()
jobs = data.get('value', [])

total_jobs = len(jobs)
failed_jobs = len([j for j in jobs if j.get('State') == 'Faulted'])
running_jobs = len([j for j in jobs if j.get('State') == 'Running'])

return {
"total_jobs_1h": total_jobs,
"failed_jobs_1h": failed_jobs,
"running_jobs": running_jobs,
"job_failure_rate": failed_jobs / total_jobs if total_jobs > 0 else 0
}

except Exception:
return {}

async def _collect_system_metrics(self) -> Dict[str, float]:
"""收集系统指标"""
try:
import psutil

return {
"cpu_usage": psutil.cpu_percent(interval=1),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent
}
except Exception:
return {}

async def _collect_queue_metrics(self, session: aiohttp.ClientSession) -> Dict[str, float]:
"""收集队列指标"""
try:
headers = {"Authorization": f"Bearer {self.api_key}"}

async with session.get(
f"{self.orchestrator_url}/odata/QueueItems",
headers=headers,
params={"$filter": "Status eq 'New'"}
) as response:
if response.status != 200:
return {}

data = await response.json()
queue_items = data.get('value', [])

return {
"queue_backlog": len(queue_items)
}

except Exception:
return {}

def check_alerts(self, metrics: Dict[str, float]) -> List[Dict]:
"""检查告警条件"""
alerts = []
current_time = datetime.now()

for rule in self.alert_rules:
if rule.metric not in metrics:
continue

metric_value = metrics[rule.metric]

# 检查告警条件
if self._evaluate_condition(metric_value, rule.threshold, rule.operator):
# 检查冷却时间
last_alert_time = self.alert_history.get(rule.name)
if last_alert_time:
time_diff = (current_time - last_alert_time).total_seconds() / 60
if time_diff < rule.cooldown_minutes:
continue # 还在冷却期内

# 生成告警
alert = {
"rule_name": rule.name,
"metric": rule.metric,
"current_value": metric_value,
"threshold": rule.threshold,
"severity": rule.severity,
"timestamp": current_time.isoformat(),
"message": f"{rule.name}: {rule.metric}当前值{metric_value}超过阈值{rule.threshold}"
}

alerts.append(alert)
self.alert_history[rule.name] = current_time

return alerts

def _evaluate_condition(self, value: float, threshold: float, operator: str) -> bool:
"""评估告警条件"""
if operator == ">":
return value > threshold
elif operator == "<":
return value < threshold
elif operator == ">=":
return value >= threshold
elif operator == "<=":
return value <= threshold
elif operator == "==":
return value == threshold
return False

async def send_alert(self, alert: Dict) -> bool:
"""发送告警通知"""
try:
# 邮件通知
await self._send_email_alert(alert)

# 可以添加其他通知方式:钉钉、微信、短信等
# await self._send_dingtalk_alert(alert)

return True
except Exception as e:
print(f"发送告警失败: {e}")
return False

async def _send_email_alert(self, alert: Dict):
"""发送邮件告警"""
smtp_config = self.config.get('smtp', {})

msg = MIMEMultipart()
msg['From'] = smtp_config.get('from_email')
msg['To'] = ', '.join(smtp_config.get('to_emails', []))
msg['Subject'] = f"[RPA告警] {alert['severity'].upper()} - {alert['rule_name']}"

body = f"""
告警详情:

规则名称: {alert['rule_name']}
指标名称: {alert['metric']}
当前值: {alert['current_value']}
阈值: {alert['threshold']}
严重程度: {alert['severity']}
触发时间: {alert['timestamp']}

详细信息: {alert['message']}

请及时处理!
"""

msg.attach(MIMEText(body, 'plain', 'utf-8'))

# 发送邮件(这里需要根据实际SMTP配置调整)
# server = smtplib.SMTP(smtp_config.get('host'), smtp_config.get('port'))
# server.send_message(msg)
# server.quit()

async def run_monitoring_loop(self, interval_seconds: int = 60):
"""运行监控循环"""
print(f"RPA监控系统启动,监控间隔: {interval_seconds}秒")

while True:
try:
# 收集指标
metrics = await self.collect_metrics()
print(f"收集到指标: {metrics}")

# 检查告警
alerts = self.check_alerts(metrics)

# 发送告警
for alert in alerts:
print(f"触发告警: {alert['message']}")
await self.send_alert(alert)

# 等待下一次检查
await asyncio.sleep(interval_seconds)

except Exception as e:
print(f"监控循环异常: {e}")
await asyncio.sleep(interval_seconds)

# 使用示例
if __name__ == "__main__":
config = {
"orchestrator_url": "https://your-orchestrator.com",
"api_key": "your-api-key",
"smtp": {
"host": "smtp.company.com",
"port": 587,
"from_email": "rpa-monitor@company.com",
"to_emails": ["admin@company.com", "ops@company.com"]
}
}

monitor = RPAMonitoringSystem(config)

# 运行监控
asyncio.run(monitor.run_monitoring_loop(60))

4.2 自动化运维流程

建立了完善的自动化运维流程:

  1. 健康检查自动化:每5分钟执行一次全面健康检查
  2. 故障自动恢复:检测到故障后自动执行恢复流程
  3. 容量规划:基于历史数据预测资源需求
  4. 版本管理:建立机器人版本管理和回滚机制
  5. 备份策略:定期备份配置和流程定义

五、效果评估与经验总结

5.1 故障解决效果

恢复时间线:

  • 09:15 - 故障发现和告警
  • 09:30 - 完成初步排查,确定主要问题
  • 10:00 - 紧急恢复措施实施完成
  • 10:30 - 根本原因修复完成
  • 11:00 - 系统完全恢复正常运行

最终效果:

  • 故障恢复时间:从发现到完全恢复仅用时1小时45分钟
  • 业务影响最小化:通过紧急恢复措施,将业务中断时间控制在30分钟内
  • 数据完整性:所有积压的发票数据完整保存,无数据丢失
  • 系统稳定性提升:优化后系统稳定性提升90%,故障率降低至0.1%

5.2 关键经验总结

故障排查方法论:

  1. 分层排查:从基础设施到应用层的系统化排查方法
  2. 并行分析:同时检查多个可能的故障点,提高排查效率
  3. 数据驱动:基于监控数据和日志进行精确定位
  4. 快速恢复:优先恢复业务,再进行根本原因修复

预防措施最佳实践:

  • 监控体系:建立多层次、全方位的监控告警体系
  • 自动化运维:通过自动化减少人为操作错误
  • 容灾备份:建立完善的备份和恢复机制
  • 变更管理:严格的变更审批和测试流程
  • 知识管理:建立故障处理知识库和应急预案

团队协作要点:

  • 明确分工:故障处理过程中的角色和职责分工
  • 沟通机制:建立高效的内部沟通和外部通报机制
  • 决策流程:快速决策机制,避免延误最佳处理时机
  • 经验传承:定期进行故障复盘和经验分享

总结

通过这次RPA生产环境故障的完整处理过程,我们不仅成功恢复了业务运行,更重要的是建立了一套完善的故障处理和预防体系。这个案例展示了RPA运维工作的复杂性和重要性,也证明了系统化的故障处理方法论的价值。

核心收获:

  1. 快速响应能力:建立了从故障发现到恢复的标准化流程,大幅缩短了故障处理时间
  2. 预防为主策略:通过智能监控和自动化运维,将大部分问题消灭在萌芽状态
  3. 技术债务管理:及时处理系统中的技术债务,避免小问题演变成大故障
  4. 团队能力提升:通过实战锻炼,提升了团队的故障处理能力和协作水平

未来改进方向:

  • AI驱动的故障预测:利用机器学习技术预测潜在故障
  • 自愈系统建设:进一步提升系统的自动恢复能力
  • 多云容灾:建立跨云平台的容灾备份机制
  • DevOps集成:将RPA运维纳入整体的DevOps体系

RPA系统的稳定运行需要技术、流程和人员的有机结合。只有建立完善的监控体系、标准化的处理流程和专业的运维团队,才能确保RPA项目在生产环境中发挥最大价值,为企业数字化转型提供可靠支撑。