# app/main.py from contextlib import asynccontextmanager from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.exceptions import RequestValidationError from fastapi.middleware.gzip import GZipMiddleware import logging import time import traceback import sys # 配置 from app.config import settings # 设置日志 logger = logging.getLogger(__name__) logger.info("✅ 日志配置完成") # ==================== 异步生命周期管理 ==================== @asynccontextmanager async def lifespan(app: FastAPI): """ 应用生命周期管理器 - 完全异步 启动时初始化资源,关闭时清理资源 """ # 启动阶段 logger.info("🚀 应用启动中...") try: # 1. 初始化数据库 from app.database import init_async_db, check_async_connection if await check_async_connection(): await init_async_db() logger.info("✅ 数据库初始化完成") else: logger.error("❌ 数据库连接失败") # 可以根据需要决定是否终止启动 # raise RuntimeError("数据库连接失败") # 2. 初始化Redis等缓存(可选) # try: # from app.core.redis import init_redis # await init_redis() # logger.info("✅ Redis连接完成") # except ImportError: # logger.info("ℹ️ Redis未配置,跳过初始化") # 3. 初始化其他服务 # try: # from app.core.security import init_security # init_security() # logger.info("✅ 安全模块初始化完成") # except Exception as e: # logger.warning(f"⚠️ 安全模块初始化异常: {e}") logger.info(f"🎉 {settings.PROJECT_NAME} v{settings.VERSION} 启动完成") yield except Exception as e: logger.error(f"🔥 应用启动失败: {e}") logger.error(traceback.format_exc()) sys.exit(1) finally: # 关闭阶段 logger.info("🛑 应用关闭中...") # 清理资源 try: from app.database import async_engine await async_engine.dispose() logger.info("✅ 数据库连接池已清理") except Exception as e: logger.warning(f"⚠️ 数据库清理异常: {e}") logger.info("👋 应用已安全关闭") # ==================== 创建FastAPI应用 ==================== app = FastAPI( title=settings.PROJECT_NAME, version=settings.VERSION, description=f""" {settings.PROJECT_NAME} API 服务 ## 功能特性 - ✅ 全异步架构(SQLAlchemy 2.0 + async/await) - 🔒 JWT认证与授权 - 📊 性能监控中间件 - 🚀 自动API文档 ## 环境 - **调试模式**: {'开启' if settings.DEBUG else '关闭'} - **数据库**: {settings.DATABASE_URL.split('://')[0] if '://' in settings.DATABASE_URL else '未知'} """, docs_url="/docs" if settings.DEBUG else None, redoc_url="/redoc" if settings.DEBUG else None, openapi_url="/openapi.json" if settings.DEBUG else None, lifespan=lifespan, # 异步生命周期管理 swagger_ui_parameters={ "persistAuthorization": True, "displayRequestDuration": True, "filter": True, } ) # ==================== 全局异常处理器 ==================== @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """全局异常处理器""" logger.error(f"🔥 未捕获的异常: {exc}", exc_info=True) # 在调试模式下返回详细错误 if settings.DEBUG: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "detail": str(exc), "type": exc.__class__.__name__, "traceback": traceback.format_exc().split('\n'), "path": request.url.path, "method": request.method, } ) # 生产环境返回简化错误 return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "detail": "服务器内部错误", "request_id": request.state.get("request_id", "unknown") if hasattr(request.state, "request_id") else "unknown" } ) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): """请求验证异常处理器""" logger.warning(f"⚠️ 请求验证失败: {exc}") return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={ "detail": exc.errors(), "body": exc.body, } ) # ==================== 中间件配置 ==================== # 1. CORS中间件 if settings.BACKEND_CORS_ORIGINS: app.add_middleware( CORSMiddleware, allow_origins=settings.BACKEND_CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"], max_age=600, # 预检请求缓存时间(秒) ) logger.info(f"✅ CORS已启用,允许的源: {settings.BACKEND_CORS_ORIGINS}") # 2. 请求ID中间件(用于日志追踪) @app.middleware("http") async def add_request_id(request: Request, call_next): """为每个请求添加唯一ID""" import uuid request_id = str(uuid.uuid4())[:8] request.state.request_id = request_id response = await call_next(request) response.headers["X-Request-ID"] = request_id return response # 3. 性能监控中间件 @app.middleware("http") async def performance_middleware(request: Request, call_next): """记录请求处理时间""" start_time = time.time() try: response = await call_next(request) process_time = time.time() - start_time # 记录慢请求 if process_time > 1.0: # 超过1秒 logger.warning( f"🐌 慢请求: {request.method} {request.url.path} " f"耗时: {process_time:.3f}s " f"状态: {response.status_code} " f"请求ID: {getattr(request.state, 'request_id', 'unknown')}" ) elif settings.DEBUG: logger.debug( f"⚡ 请求: {request.method} {request.url.path} " f"耗时: {process_time:.3f}s" ) # 添加处理时间到响应头 response.headers["X-Process-Time"] = str(process_time) return response except Exception as e: process_time = time.time() - start_time logger.error( f"🔥 请求异常: {request.method} {request.url.path} " f"耗时: {process_time:.3f}s " f"错误: {e}" ) raise # 4. GZIP压缩中间件(提升性能) app.add_middleware( GZipMiddleware, minimum_size=1000, # 只压缩大于1KB的响应 ) # ==================== 路由注册 ==================== def register_routers(): """注册所有API路由""" routers = [] # 尝试导入并注册路由模块 try: from app.api.v1.auth import router as auth_router routers.append(("认证模块", auth_router)) except ImportError as e: logger.warning(f"⚠️ 认证路由导入失败: {e}") try: from app.api.v1.users import router as users_router routers.append(("用户管理", users_router)) except ImportError as e: logger.warning(f"⚠️ 用户路由导入失败: {e}") # 更多路由模块... # try: # from app.api.v1.admin import router as admin_router # routers.append(("管理后台", admin_router)) # except ImportError: # pass # 注册所有路由 for name, router in routers: app.include_router(router, prefix=settings.API_V1_PREFIX) logger.info(f"✅ 路由注册: {name}") logger.info(f"🎯 共注册 {len(routers)} 个路由模块") # 调用路由注册 register_routers() # ==================== 系统级路由 ==================== @app.get("/health", tags=["系统监控"]) async def health_check(): """ 健康检查端点 返回服务健康状态,可用于K8s探针、负载均衡器健康检查等 """ from app.database import check_async_connection health_status = { "status": "healthy", "service": settings.PROJECT_NAME, "version": settings.VERSION, "timestamp": time.time(), "async": True, "debug": settings.DEBUG, } # 检查数据库连接 try: db_healthy = await check_async_connection() health_status["database"] = "healthy" if db_healthy else "unhealthy" except Exception as e: health_status["database"] = f"error: {str(e)}" # 检查Redis连接(可选) # try: # from app.core.redis import check_redis_connection # redis_healthy = await check_redis_connection() # health_status["redis"] = "healthy" if redis_healthy else "unhealthy" # except ImportError: # health_status["redis"] = "not_configured" # except Exception as e: # health_status["redis"] = f"error: {str(e)}" # 如果有不健康的服务,返回503 if "unhealthy" in health_status.values() or "error" in str(health_status.values()): return JSONResponse( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content=health_status ) return health_status @app.get("/", tags=["系统"]) async def root(): """ 根端点 返回API基本信息和可用端点 """ # 获取已注册的路由信息 routes_info = [] for route in app.routes: if hasattr(route, "methods"): routes_info.append({ "path": route.path, "methods": list(route.methods) if route.methods else [], "name": route.name or "", }) return { "message": f"欢迎使用 {settings.PROJECT_NAME} API", "version": settings.VERSION, "description": "基于FastAPI的现代化异步API服务", "docs": "/docs" if settings.DEBUG else None, "redoc": "/redoc" if settings.DEBUG else None, "health_check": "/health", "async": True, "database": "SQLAlchemy 2.0 Async", "routes_count": len(routes_info), "quick_links": { "认证": f"{settings.API_V1_PREFIX}/auth/login", "用户信息": f"{settings.API_V1_PREFIX}/users/me", "API文档": "/docs" if settings.DEBUG else "未启用", } } @app.get("/metrics", tags=["系统监控"]) async def metrics(): """ 监控指标端点(可用于Prometheus) 返回应用性能指标 """ import psutil import gc # 内存使用 process = psutil.Process() mem_info = process.memory_info() # GC统计 gc_counts = gc.get_count() return { "process": { "pid": process.pid, "cpu_percent": process.cpu_percent(), "memory_mb": mem_info.rss / 1024 / 1024, "threads": process.num_threads(), }, "gc": { "generation0": gc_counts[0], "generation1": gc_counts[1], "generation2": gc_counts[2], }, "python": { "version": sys.version, "implementation": sys.implementation.name, }, "timestamp": time.time(), } @app.get("/config", tags=["系统"]) async def show_config(): """ 显示当前配置(调试用) 注意:生产环境建议禁用此端点或限制访问 """ if not settings.DEBUG: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="仅调试模式下可用" ) # 安全地显示配置(隐藏敏感信息) config_dict = {} for key in dir(settings): if not key.startswith("_"): value = getattr(settings, key) # 隐藏敏感信息 if any(sensitive in key.lower() for sensitive in ["secret", "key", "password", "token"]): config_dict[key] = "***HIDDEN***" elif "url" in key.lower() and "database" in key.lower(): # 显示数据库类型但不显示完整URL db_type = value.split("://")[0] if "://" in value else "unknown" config_dict[key] = f"{db_type}://***HIDDEN***" else: config_dict[key] = value return { "config": config_dict, "environment": settings.ENVIRONMENT, "debug": settings.DEBUG, } # ==================== 启动入口 ==================== def run_server(): """启动服务器(开发模式)""" import uvicorn logger.info("=" * 50) logger.info(f"🚀 启动 {settings.PROJECT_NAME} v{settings.VERSION}") logger.info(f"🐛 调试模式: {settings.DEBUG}") logger.info(f"🌐 主机: 0.0.0.0") logger.info(f"🔌 端口: {settings.APP_RUN_PORT}") logger.info(f"🗄️ 数据库: {settings.DATABASE_URL.split('://')[0] if '://' in settings.DATABASE_URL else '未知'}") logger.info("=" * 50) uvicorn.run( app, host="0.0.0.0", port=settings.APP_RUN_PORT, reload=settings.DEBUG, log_level="info" if settings.DEBUG else "warning", access_log=True, use_colors=True, # 优化性能的配置 limit_concurrency=1000, limit_max_requests=10000, timeout_keep_alive=5, ) if __name__ == "__main__": run_server()