1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| import asyncio import uuid from typing import Any, Dict from dataclasses import dataclass
@dataclass class ToolMessage: """工具间消息""" id: str sender: str message_type: str data: Any timestamp: float
class ToolCommunicationBus: """工具通信总线""" def __init__(self): self.message_queues: Dict[str, asyncio.Queue] = {} self.subscribers: Dict[str, List[str]] = {} def register_tool(self, tool_name: str): """注册工具""" if tool_name not in self.message_queues: self.message_queues[tool_name] = asyncio.Queue(maxsize=100) def subscribe(self, tool_name: str, message_type: str): """订阅消息类型""" if message_type not in self.subscribers: self.subscribers[message_type] = [] if tool_name not in self.subscribers[message_type]: self.subscribers[message_type].append(tool_name) async def publish(self, sender: str, message_type: str, data: Any): """发布消息""" message = ToolMessage( id=str(uuid.uuid4()), sender=sender, message_type=message_type, data=data, timestamp=time.time() ) subscribers = self.subscribers.get(message_type, []) for subscriber in subscribers: if subscriber != sender and subscriber in self.message_queues: try: await asyncio.wait_for( self.message_queues[subscriber].put(message), timeout=1.0 ) except asyncio.TimeoutError: logging.warning(f"消息发送超时: {message_type} -> {subscriber}") async def receive(self, tool_name: str, timeout: float = 10.0): """接收消息""" if tool_name not in self.message_queues: return None try: message = await asyncio.wait_for( self.message_queues[tool_name].get(), timeout=timeout ) return message except asyncio.TimeoutError: return None
class OptimizedDatabaseQueryTool(BaseTool): """优化后的数据库查询工具""" name = "database_query" description = "查询数据库获取数据" def __init__(self, communication_bus: ToolCommunicationBus): self.communication_bus = communication_bus self.communication_bus.register_tool(self.name) self.connection_semaphore = asyncio.Semaphore(3) async def _arun(self, query: str) -> str: """异步执行数据库查询""" task_id = str(uuid.uuid4()) try: async with self.connection_semaphore: result = await self.execute_query_with_timeout(query, timeout=20.0) await self.communication_bus.publish( sender=self.name, message_type="query_result", data={ 'task_id': task_id, 'query': query, 'result': result } ) return f"查询完成,任务ID: {task_id}" except asyncio.TimeoutError: return f"查询超时,任务ID: {task_id}" except Exception as e: logging.error(f"数据库查询异常: {e}") return f"查询失败: {str(e)}" async def execute_query_with_timeout(self, query: str, timeout: float): """带超时的查询执行""" await asyncio.sleep(1) return [{"id": 1, "value": 100}, {"id": 2, "value": 200}]
class OptimizedChartGeneratorTool(BaseTool): """优化后的图表生成工具""" name = "chart_generator" description = "基于数据生成图表" def __init__(self, communication_bus: ToolCommunicationBus): self.communication_bus = communication_bus self.communication_bus.register_tool(self.name) self.communication_bus.subscribe(self.name, "query_result") async def _arun(self, chart_type: str) -> str: """异步生成图表""" try: logging.info("等待数据查询结果...") message = await self.communication_bus.receive( tool_name=self.name, timeout=30.0 ) if not message or message.message_type != "query_result": return "未收到数据查询结果,图表生成失败" data = message.data['result'] chart_path = await self.generate_chart_with_timeout( data, chart_type, timeout=15.0 ) return f"图表生成完成: {chart_path}" except asyncio.TimeoutError: return "等待数据超时,图表生成失败" except Exception as e: return f"图表生成异常: {str(e)}" async def generate_chart_with_timeout(self, data, chart_type: str, timeout: float): """带超时的图表生成""" chart_path = f"/tmp/chart_{uuid.uuid4()}.png" await asyncio.sleep(2) return chart_path
|