1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| import asyncio import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Optional
class OptimizedRPAController: """优化的RPA并发控制器""" def __init__(self, max_concurrent: int = 50): self.max_concurrent = max_concurrent self.port_manager = PortManager() self.file_manager = SafeFileManager("/rpa/data") self.executor = ThreadPoolExecutor(max_workers=max_concurrent) self.active_tasks = {} self.task_lock = threading.Lock() self.stats = { "total_executed": 0, "current_running": 0, "failed_tasks": 0, "avg_execution_time": 0 } def execute_task_safely(self, task_id: str) -> Dict: """安全执行单个任务""" start_time = time.time() try: with self.task_lock: self.active_tasks[task_id] = { "start_time": start_time, "status": "running" } self.stats["current_running"] += 1 with self.port_manager.allocate_port() as debug_port: task_data = self.file_manager.safe_read_task_data(task_id) browser = self._launch_chrome_with_port(debug_port) try: result = self._execute_business_logic(task_data, browser) self.file_manager.safe_write_result(task_id, result) return {"status": "success", "task_id": task_id, "result": result} finally: self._cleanup_chrome_process(browser) except Exception as e: self.stats["failed_tasks"] += 1 return {"status": "failed", "task_id": task_id, "error": str(e)} finally: execution_time = time.time() - start_time with self.task_lock: if task_id in self.active_tasks: del self.active_tasks[task_id] self.stats["current_running"] -= 1 self.stats["total_executed"] += 1 current_avg = self.stats["avg_execution_time"] total_count = self.stats["total_executed"] self.stats["avg_execution_time"] = ( (current_avg * (total_count - 1) + execution_time) / total_count ) def execute_batch_tasks(self, task_list: List[str]) -> List[Dict]: """批量执行任务 - 优化版本""" print(f"🚀 开始批量执行 {len(task_list)} 个任务,最大并发: {self.max_concurrent}") future_to_task = { self.executor.submit(self.execute_task_safely, task_id): task_id for task_id in task_list } results = [] completed_count = 0 for future in as_completed(future_to_task): task_id = future_to_task[future] try: result = future.result() results.append(result) completed_count += 1 if completed_count % 10 == 0: print(f"📊 已完成: {completed_count}/{len(task_list)}") except Exception as e: print(f"❌ 任务 {task_id} 执行异常: {e}") results.append({ "status": "error", "task_id": task_id, "error": str(e) }) return results def _launch_chrome_with_port(self, debug_port: int): """使用指定端口启动Chrome""" chrome_options = [ f"--remote-debugging-port={debug_port}", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", f"--user-data-dir=/tmp/chrome_profile_{debug_port}" ] from selenium import webdriver from selenium.webdriver.chrome.options import Options options = Options() for opt in chrome_options: options.add_argument(opt) return webdriver.Chrome(options=options) def _cleanup_chrome_process(self, browser): """清理Chrome进程""" try: if browser: browser.quit() except: pass try: browser_pid = browser.service.process.pid psutil.Process(browser_pid).terminate() except: pass def get_system_status(self) -> Dict: """获取系统状态""" return { **self.stats, "active_tasks": list(self.active_tasks.keys()), "available_ports": self.port_manager.port_range_end - len(self.port_manager.used_ports), "system_load": psutil.cpu_percent() }
|