
| import heapq from dataclasses import dataclass from enum import Enum import time
class TaskPriority(Enum): LOW = 1 NORMAL = 2 HIGH = 3 URGENT = 4
@dataclass class RPATask: task_id: str robot_id: str priority: TaskPriority required_resources: list estimated_duration: int retry_count: int = 0 max_retries: int = 3 created_at: float = None def __post_init__(self): if self.created_at is None: self.created_at = time.time() def __lt__(self, other): return (self.priority.value, -self.created_at) > (other.priority.value, -other.created_at)
class IntelligentRPAScheduler: """智能RPA任务调度器""" def __init__(self, resource_manager): self.resource_manager = resource_manager self.task_queue = [] self.running_tasks = {} self.completed_tasks = {} self.robot_status = {} async def submit_task(self, task: RPATask): """提交任务""" if not await self._check_resource_availability(task): logging.warning(f"资源不足,任务延后: {task.task_id}") await asyncio.sleep(60) return await self.submit_task(task) heapq.heappush(self.task_queue, task) logging.info(f"任务已提交: {task.task_id}, 优先级: {task.priority.name}") return True async def schedule_tasks(self): """调度任务执行""" while True: try: if self.task_queue and self._has_available_robots(): task = heapq.heappop(self.task_queue) robot_id = await self._allocate_robot(task) if robot_id: await self._execute_task(task, robot_id) else: heapq.heappush(self.task_queue, task) await asyncio.sleep(5) except Exception as e: logging.error(f"调度器异常: {e}") await asyncio.sleep(10) async def _check_resource_availability(self, task: RPATask): """检查资源可用性""" if len(self.running_tasks) >= 30: return False return True def _has_available_robots(self): """检查是否有可用机器人""" return sum(1 for status in self.robot_status.values() if status == 'idle') > 0 async def _allocate_robot(self, task: RPATask): """分配机器人""" if task.robot_id and self.robot_status.get(task.robot_id) == 'idle': return task.robot_id for robot_id, status in self.robot_status.items(): if status == 'idle': return robot_id return None async def _execute_task(self, task: RPATask, robot_id: str): """执行任务""" self.robot_status[robot_id] = 'busy' self.running_tasks[task.task_id] = { 'task': task, 'robot_id': robot_id, 'start_time': time.time() } logging.info(f"开始执行任务: {task.task_id} on {robot_id}") asyncio.create_task(self._run_task(task, robot_id)) async def _run_task(self, task: RPATask, robot_id: str): """运行任务""" try: result = await self._call_rpa_engine(task, robot_id) if result['success']: self._handle_task_success(task, robot_id, result) else: self._handle_task_failure(task, robot_id, result['error']) except Exception as e: self._handle_task_failure(task, robot_id, str(e)) async def _call_rpa_engine(self, task: RPATask, robot_id: str): """调用RPA引擎执行任务""" await asyncio.sleep(task.estimated_duration) return {'success': True, 'result': 'Task completed successfully'} def _handle_task_success(self, task: RPATask, robot_id: str, result): """处理任务成功""" if task.task_id in self.running_tasks: del self.running_tasks[task.task_id] self.completed_tasks[task.task_id] = { 'task': task, 'robot_id': robot_id, 'result': result, 'completed_at': time.time() } self.robot_status[robot_id] = 'idle' logging.info(f"任务执行成功: {task.task_id}") def _handle_task_failure(self, task: RPATask, robot_id: str, error): """处理任务失败""" if task.task_id in self.running_tasks: del self.running_tasks[task.task_id] self.robot_status[robot_id] = 'idle' if task.retry_count < task.max_retries: task.retry_count += 1 heapq.heappush(self.task_queue, task) logging.warning(f"任务失败,将重试: {task.task_id}, 重试次数: {task.retry_count}") else: logging.error(f"任务最终失败: {task.task_id}, 错误: {error}")
|