1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
| import heapq from dataclasses import dataclass from enum import Enum import time
class TaskPriority(Enum): LOW = 1 NORMAL = 2 HIGH = 3 URGENT = 4
@dataclass class RPATask: task_id: str robot_id: str priority: TaskPriority required_resources: list estimated_duration: int retry_count: int = 0 max_retries: int = 3 created_at: float = None def __post_init__(self): if self.created_at is None: self.created_at = time.time() def __lt__(self, other): return (self.priority.value, -self.created_at) > (other.priority.value, -other.created_at)
class IntelligentRPAScheduler: """智能RPA任务调度器""" def __init__(self, resource_manager): self.resource_manager = resource_manager self.task_queue = [] self.running_tasks = {} self.completed_tasks = {} self.robot_status = {} async def submit_task(self, task: RPATask): """提交任务""" if not await self._check_resource_availability(task): logging.warning(f"资源不足,任务延后: {task.task_id}") await asyncio.sleep(60) return await self.submit_task(task) heapq.heappush(self.task_queue, task) logging.info(f"任务已提交: {task.task_id}, 优先级: {task.priority.name}") return True async def schedule_tasks(self): """调度任务执行""" while True: try: if self.task_queue and self._has_available_robots(): task = heapq.heappop(self.task_queue) robot_id = await self._allocate_robot(task) if robot_id: await self._execute_task(task, robot_id) else: heapq.heappush(self.task_queue, task) await asyncio.sleep(5) except Exception as e: logging.error(f"调度器异常: {e}") await asyncio.sleep(10) async def _check_resource_availability(self, task: RPATask): """检查资源可用性""" if len(self.running_tasks) >= 30: return False return True def _has_available_robots(self): """检查是否有可用机器人""" return sum(1 for status in self.robot_status.values() if status == 'idle') > 0 async def _allocate_robot(self, task: RPATask): """分配机器人""" if task.robot_id and self.robot_status.get(task.robot_id) == 'idle': return task.robot_id for robot_id, status in self.robot_status.items(): if status == 'idle': return robot_id return None async def _execute_task(self, task: RPATask, robot_id: str): """执行任务""" self.robot_status[robot_id] = 'busy' self.running_tasks[task.task_id] = { 'task': task, 'robot_id': robot_id, 'start_time': time.time() } logging.info(f"开始执行任务: {task.task_id} on {robot_id}") asyncio.create_task(self._run_task(task, robot_id)) async def _run_task(self, task: RPATask, robot_id: str): """运行任务""" try: result = await self._call_rpa_engine(task, robot_id) if result['success']: self._handle_task_success(task, robot_id, result) else: self._handle_task_failure(task, robot_id, result['error']) except Exception as e: self._handle_task_failure(task, robot_id, str(e)) async def _call_rpa_engine(self, task: RPATask, robot_id: str): """调用RPA引擎执行任务""" await asyncio.sleep(task.estimated_duration) return {'success': True, 'result': 'Task completed successfully'} def _handle_task_success(self, task: RPATask, robot_id: str, result): """处理任务成功""" if task.task_id in self.running_tasks: del self.running_tasks[task.task_id] self.completed_tasks[task.task_id] = { 'task': task, 'robot_id': robot_id, 'result': result, 'completed_at': time.time() } self.robot_status[robot_id] = 'idle' logging.info(f"任务执行成功: {task.task_id}") def _handle_task_failure(self, task: RPATask, robot_id: str, error): """处理任务失败""" if task.task_id in self.running_tasks: del self.running_tasks[task.task_id] self.robot_status[robot_id] = 'idle' if task.retry_count < task.max_retries: task.retry_count += 1 heapq.heappush(self.task_queue, task) logging.warning(f"任务失败,将重试: {task.task_id}, 重试次数: {task.retry_count}") else: logging.error(f"任务最终失败: {task.task_id}, 错误: {error}")
|