1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| class CoordinatorAgent(BaseAgent): """协调器Agent - 负责任务分解和调度""" def __init__(self, agent_id: str): super().__init__(agent_id, AgentType.COORDINATOR) self.capabilities = {"task_decomposition", "scheduling", "monitoring"} self.specialist_agents = {} self.active_tasks = {} async def process_task(self, task: Task) -> Dict[str, Any]: """处理复杂任务 - 分解和协调""" print(f"协调器接收任务: {task.task_id}") subtasks = await self.decompose_task(task) task_graph = self.build_task_graph(subtasks) results = await self.schedule_and_execute(task_graph) final_result = await self.aggregate_results(results) return { "status": "success", "result": final_result, "execution_time": asyncio.get_event_loop().time() - task.input_data.get("start_time", 0) } async def decompose_task(self, task: Task) -> List[Task]: """智能任务分解""" task_type = task.task_type subtasks = [] if task_type == "customer_service_inquiry": subtasks = [ Task( task_id=f"{task.task_id}_user_auth", task_type="user_authentication", input_data={"user_id": task.input_data.get("user_id")}, dependencies=[], priority=1, timeout=30, callback_agent=self.agent_id ), Task( task_id=f"{task.task_id}_order_query", task_type="database_query", input_data={"query_type": "order_history"}, dependencies=[f"{task.task_id}_user_auth"], priority=2, timeout=60, callback_agent=self.agent_id ), Task( task_id=f"{task.task_id}_response_gen", task_type="response_generation", input_data={"template": "customer_service"}, dependencies=[f"{task.task_id}_order_query"], priority=3, timeout=45, callback_agent=self.agent_id ) ] print(f"任务分解完成: {len(subtasks)} 个子任务") return subtasks def build_task_graph(self, subtasks: List[Task]) -> Dict[str, List[str]]: """构建任务依赖图""" graph = {} for task in subtasks: graph[task.task_id] = task.dependencies return graph async def schedule_and_execute(self, task_graph: Dict[str, List[str]]) -> Dict[str, Any]: """调度并执行任务""" results = {} completed_tasks = set() while len(completed_tasks) < len(task_graph): ready_tasks = [ task_id for task_id, deps in task_graph.items() if task_id not in completed_tasks and all(dep in completed_tasks for dep in deps) ] if not ready_tasks: print("检测到任务依赖死锁") break tasks = [] for task_id in ready_tasks: specialist = self.find_suitable_specialist(task_id) if specialist: tasks.append(self.delegate_task(specialist, task_id)) task_results = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(task_results): if not isinstance(result, Exception): task_id = ready_tasks[i] results[task_id] = result completed_tasks.add(task_id) return results def find_suitable_specialist(self, task_id: str) -> str: """找到合适的专业Agent""" if "auth" in task_id: return "auth_agent" elif "query" in task_id: return "database_agent" elif "response" in task_id: return "nlp_agent" return "general_agent" async def delegate_task(self, specialist_id: str, task_id: str) -> Any: """委派任务给专业Agent""" message = { "type": "task_assignment", "task_id": task_id, "timeout": 60 } await self.send_message(specialist_id, message) response = await self.receive_message() return response["content"]["result"]
|