1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| import asyncio import aiohttp import time from typing import AsyncGenerator import logging
class BackpressureController: """背压控制器""" def __init__(self, max_concurrent: int = 100, max_queue_size: int = 1000): self.semaphore = asyncio.Semaphore(max_concurrent) self.queue = asyncio.Queue(maxsize=max_queue_size) self.metrics = { 'processed': 0, 'errors': 0, 'queue_full_blocks': 0, 'semaphore_waits': 0 } self.start_time = time.time() async def put_with_backpressure(self, item): """带背压的入队""" try: self.queue.put_nowait(item) except asyncio.QueueFull: self.metrics['queue_full_blocks'] += 1 await self.queue.put(item) async def get_with_limit(self): """带并发限制的出队""" item = await self.queue.get() semaphore_start = time.time() await self.semaphore.acquire() wait_time = time.time() - semaphore_start if wait_time > 0.1: self.metrics['semaphore_waits'] += 1 return item def release_limit(self): """释放并发限制""" self.semaphore.release() self.queue.task_done() def get_stats(self) -> dict: """获取统计信息""" elapsed = time.time() - self.start_time qps = self.metrics['processed'] / elapsed if elapsed > 0 else 0 return { **self.metrics, 'qps': qps, 'queue_size': self.queue.qsize(), 'available_permits': self.semaphore._value, 'elapsed_seconds': elapsed }
async def optimized_processor(session: aiohttp.ClientSession, url: str) -> dict: """优化后的处理器""" try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response: await asyncio.sleep(0.2) return {"status": response.status, "url": url} except asyncio.TimeoutError: return {"error": "timeout", "url": url} except Exception as e: return {"error": str(e), "url": url}
async def optimized_consumer(controller: BackpressureController): """优化后的消费者:有并发控制""" async with aiohttp.ClientSession() as session: while True: url = await controller.get_with_limit() task = asyncio.create_task( process_with_cleanup(session, url, controller) )
async def process_with_cleanup(session: aiohttp.ClientSession, url: str, controller: BackpressureController): """带清理的处理函数""" try: result = await optimized_processor(session, url) if "error" in result: controller.metrics['errors'] += 1 else: controller.metrics['processed'] += 1 except Exception as e: controller.metrics['errors'] += 1 logging.error(f"处理失败: {url}, 错误: {e}") finally: controller.release_limit()
async def smart_producer(controller: BackpressureController): """智能生产者:感知背压""" counter = 0 while True: batch_size = 50 stats = controller.get_stats() if stats['queue_size'] > 800: batch_size = 10 await asyncio.sleep(0.5) elif stats['queue_size'] < 100: batch_size = 100 await asyncio.sleep(0.05) for _ in range(batch_size): url = f"http://httpbin.org/delay/1?id={counter}" await controller.put_with_backpressure(url) counter += 1 await asyncio.sleep(0.1)
async def stats_reporter(controller: BackpressureController): """统计信息报告器""" while True: await asyncio.sleep(10) stats = controller.get_stats() print(f"[统计] QPS: {stats['qps']:.1f}, " f"处理: {stats['processed']}, " f"错误: {stats['errors']}, " f"队列: {stats['queue_size']}, " f"可用协程: {stats['available_permits']}")
|