1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
| import functools import asyncio from collections import defaultdict
class PerformanceProfiler: """性能分析器""" def __init__(self): self.metrics = defaultdict(list) self.call_counts = defaultdict(int) def profile_sync(self, func_name: str = None): """同步函数性能装饰器""" def decorator(func): name = func_name or f"{func.__module__}.{func.__name__}" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) return result finally: end_time = time.time() duration = end_time - start_time self.metrics[name].append(duration) self.call_counts[name] += 1 return wrapper return decorator def profile_async(self, func_name: str = None): """异步函数性能装饰器""" def decorator(func): name = func_name or f"{func.__module__}.{func.__name__}" @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) return result finally: end_time = time.time() duration = end_time - start_time self.metrics[name].append(duration) self.call_counts[name] += 1 return wrapper return decorator def get_report(self) -> Dict[str, Dict]: """生成性能报告""" report = {} for func_name, durations in self.metrics.items(): if durations: report[func_name] = { "call_count": self.call_counts[func_name], "total_time": sum(durations), "avg_time": sum(durations) / len(durations), "min_time": min(durations), "max_time": max(durations) } return report def print_report(self): """打印性能报告""" report = self.get_report() print("\n=== 性能分析报告 ===") sorted_funcs = sorted(report.items(), key=lambda x: x[1]["avg_time"], reverse=True) for func_name, stats in sorted_funcs: print(f"\n{func_name}:") print(f" 调用次数: {stats['call_count']}") print(f" 总耗时: {stats['total_time']:.3f}s") print(f" 平均耗时: {stats['avg_time']:.3f}s") print(f" 最短耗时: {stats['min_time']:.3f}s") print(f" 最长耗时: {stats['max_time']:.3f}s")
profiler = PerformanceProfiler()
class DebuggingToolManager: """调试版工具管理器""" def __init__(self): self.tools_cache = None self.similarity_cache = {} @profiler.profile_sync("discover_tools") def discover_tools(self) -> Dict[str, Any]: """工具发现 - 调试版本""" if self.tools_cache is not None: print("使用缓存的工具列表") return self.tools_cache print("执行工具发现...") discovered_tools = {} for i in range(10): tool_name = f"tool_{i}" time.sleep(0.01) discovered_tools[tool_name] = { "name": tool_name, "description": f"Tool {i} for processing data", "parameters": {"query": "string"} } self.tools_cache = discovered_tools return discovered_tools @profiler.profile_async("select_best_tool") async def select_best_tool(self, user_query: str, available_tools: Dict) -> str: """工具选择 - 调试版本""" print(f"为查询选择最佳工具: {user_query}") tasks = [] tool_names = [] for tool_name, tool_info in available_tools.items(): task = asyncio.create_task( self.calculate_similarity(user_query, tool_info["description"]) ) tasks.append(task) tool_names.append(tool_name) scores = await asyncio.gather(*tasks) best_idx = scores.index(max(scores)) best_tool = tool_names[best_idx] print(f"选择工具: {best_tool}, 得分: {scores[best_idx]:.3f}") return best_tool @profiler.profile_async("calculate_similarity") async def calculate_similarity(self, query: str, description: str) -> float: """相似度计算 - 调试版本""" cache_key = f"{query}_{description}" if cache_key in self.similarity_cache: return self.similarity_cache[cache_key] await asyncio.sleep(0.01) common_words = len(set(query.split()) & set(description.split())) score = common_words / max(len(query.split()), len(description.split())) self.similarity_cache[cache_key] = score return score
class DebuggingAgent: """调试版Agent""" def __init__(self): self.tool_manager = DebuggingToolManager() @profiler.profile_async("process_user_input") async def process_user_input(self, user_input: str) -> Dict[str, Any]: """处理用户输入 - 调试版本""" start_time = time.time() try: available_tools = await asyncio.to_thread(self.tool_manager.discover_tools) selected_tool = await self.tool_manager.select_best_tool(user_input, available_tools) if not selected_tool: return {"error": "No suitable tool found"} try: result = await asyncio.wait_for( self.execute_tool(selected_tool, {"query": user_input}), timeout=5.0 ) except asyncio.TimeoutError: return {"error": "Tool execution timeout"} end_time = time.time() return { "tool_used": selected_tool, "result": result, "response_time": end_time - start_time } except Exception as e: return {"error": str(e)} @profiler.profile_async("execute_tool") async def execute_tool(self, tool_name: str, parameters: Dict) -> Any: """执行工具 - 调试版本""" await asyncio.sleep(0.5) return f"Result from {tool_name}: processed '{parameters.get('query', '')}'"
|