1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from typing import Dict, Any import asyncio
class OrderRequest(BaseModel): user_id: int product_id: int amount: float payment_method: str
class AsyncOrderService: """异步订单服务""" def __init__(self, db_session: AsyncSession, redis, http_session): self.db = db_session self.redis = redis self.http = http_session async def create_order(self, order_data: OrderRequest) -> Dict[str, Any]: """异步创建订单""" user_task = self.get_user(order_data.user_id) inventory_task = self.check_inventory(order_data.product_id) price_task = self.get_product_price(order_data.product_id) try: user, inventory_status, product_price = await asyncio.gather( user_task, inventory_task, price_task, return_exceptions=True ) if isinstance(user, Exception): raise HTTPException(status_code=404, detail="用户不存在") if isinstance(inventory_status, Exception) or not inventory_status: raise HTTPException(status_code=400, detail="库存不足") if isinstance(product_price, Exception): raise HTTPException(status_code=404, detail="商品信息获取失败") if abs(order_data.amount - product_price) > 0.01: raise HTTPException(status_code=400, detail="价格不匹配") payment_result = await self.process_payment(order_data) if not payment_result['success']: raise HTTPException(status_code=400, detail="支付失败") order = await self.save_order(order_data, payment_result['transaction_id']) asyncio.create_task(self.update_inventory(order_data.product_id)) asyncio.create_task(self.send_notifications(order)) return { 'order_id': order.id, 'status': 'success', 'transaction_id': payment_result['transaction_id'] } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"订单创建失败: {str(e)}") async def get_user(self, user_id: int): """异步获取用户信息""" cache_key = f"user:{user_id}" cached_user = await self.redis.get(cache_key) if cached_user: return json.loads(cached_user) result = await self.db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise ValueError("用户不存在") user_data = {'id': user.id, 'name': user.name, 'email': user.email} await self.redis.setex(cache_key, 3600, json.dumps(user_data)) return user_data async def check_inventory(self, product_id: int) -> bool: """异步检查库存""" async with self.http.post( 'http://inventory-service/check', json={'product_id': product_id} ) as response: if response.status == 200: data = await response.json() return data.get('available', False) return False async def process_payment(self, order_data: OrderRequest) -> Dict[str, Any]: """异步处理支付""" payment_payload = { 'user_id': order_data.user_id, 'amount': order_data.amount, 'method': order_data.payment_method } async with self.http.post( 'http://payment-service/process', json=payment_payload ) as response: if response.status == 200: return await response.json() else: error_data = await response.text() raise ValueError(f"支付失败: {error_data}")
router = APIRouter(prefix="/api/orders", tags=["订单"])
@router.post("/create") async def create_order( order_data: OrderRequest, db: AsyncSession = Depends(get_db_session), redis = Depends(get_redis) ): """创建订单API""" service = AsyncOrderService(db, redis, arch.http_session) return await service.create_order(order_data)
|