| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- # app/main.py
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, Request, status
- from fastapi.responses import JSONResponse
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.exceptions import RequestValidationError
- from fastapi.middleware.gzip import GZipMiddleware
- import logging
- import time
- import traceback
- import sys
-
- # 配置
- from app.config import settings
-
- # 设置日志
- logger = logging.getLogger(__name__)
- logger.info("✅ 日志配置完成")
-
- # ==================== 异步生命周期管理 ====================
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """
- 应用生命周期管理器 - 完全异步
- 启动时初始化资源,关闭时清理资源
- """
- # 启动阶段
- logger.info("🚀 应用启动中...")
-
- try:
- # 1. 初始化数据库
- from app.database import init_async_db, check_async_connection
- if await check_async_connection():
- await init_async_db()
- logger.info("✅ 数据库初始化完成")
- else:
- logger.error("❌ 数据库连接失败")
- # 可以根据需要决定是否终止启动
- # raise RuntimeError("数据库连接失败")
-
- # 2. 初始化Redis等缓存(可选)
- # try:
- # from app.core.redis import init_redis
- # await init_redis()
- # logger.info("✅ Redis连接完成")
- # except ImportError:
- # logger.info("ℹ️ Redis未配置,跳过初始化")
-
- # 3. 初始化其他服务
- # try:
- # from app.core.security import init_security
- # init_security()
- # logger.info("✅ 安全模块初始化完成")
- # except Exception as e:
- # logger.warning(f"⚠️ 安全模块初始化异常: {e}")
-
- logger.info(f"🎉 {settings.PROJECT_NAME} v{settings.VERSION} 启动完成")
- yield
-
- except Exception as e:
- logger.error(f"🔥 应用启动失败: {e}")
- logger.error(traceback.format_exc())
- sys.exit(1)
-
- finally:
- # 关闭阶段
- logger.info("🛑 应用关闭中...")
-
- # 清理资源
- try:
- from app.database import async_engine
- await async_engine.dispose()
- logger.info("✅ 数据库连接池已清理")
- except Exception as e:
- logger.warning(f"⚠️ 数据库清理异常: {e}")
-
- logger.info("👋 应用已安全关闭")
-
-
- # ==================== 创建FastAPI应用 ====================
- app = FastAPI(
- title=settings.PROJECT_NAME,
- version=settings.VERSION,
- description=f"""
- {settings.PROJECT_NAME} API 服务
-
- ## 功能特性
- - ✅ 全异步架构(SQLAlchemy 2.0 + async/await)
- - 🔒 JWT认证与授权
- - 📊 性能监控中间件
- - 🚀 自动API文档
-
- ## 环境
- - **调试模式**: {'开启' if settings.DEBUG else '关闭'}
- - **数据库**: {settings.DATABASE_URL.split('://')[0] if '://' in settings.DATABASE_URL else '未知'}
- """,
- docs_url="/docs" if settings.DEBUG else None,
- redoc_url="/redoc" if settings.DEBUG else None,
- openapi_url="/openapi.json" if settings.DEBUG else None,
- lifespan=lifespan, # 异步生命周期管理
- swagger_ui_parameters={
- "persistAuthorization": True,
- "displayRequestDuration": True,
- "filter": True,
- }
- )
-
- # ==================== 全局异常处理器 ====================
- @app.exception_handler(Exception)
- async def global_exception_handler(request: Request, exc: Exception):
- """全局异常处理器"""
- logger.error(f"🔥 未捕获的异常: {exc}", exc_info=True)
-
- # 在调试模式下返回详细错误
- if settings.DEBUG:
- return JSONResponse(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- content={
- "detail": str(exc),
- "type": exc.__class__.__name__,
- "traceback": traceback.format_exc().split('\n'),
- "path": request.url.path,
- "method": request.method,
- }
- )
-
- # 生产环境返回简化错误
- return JSONResponse(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- content={
- "detail": "服务器内部错误",
- "request_id": request.state.get("request_id", "unknown") if hasattr(request.state, "request_id") else "unknown"
- }
- )
-
-
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(request: Request, exc: RequestValidationError):
- """请求验证异常处理器"""
- logger.warning(f"⚠️ 请求验证失败: {exc}")
-
- return JSONResponse(
- status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
- content={
- "detail": exc.errors(),
- "body": exc.body,
- }
- )
-
-
- # ==================== 中间件配置 ====================
- # 1. CORS中间件
- if settings.BACKEND_CORS_ORIGINS:
- app.add_middleware(
- CORSMiddleware,
- allow_origins=settings.BACKEND_CORS_ORIGINS,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- expose_headers=["*"],
- max_age=600, # 预检请求缓存时间(秒)
- )
- logger.info(f"✅ CORS已启用,允许的源: {settings.BACKEND_CORS_ORIGINS}")
-
- # 2. 请求ID中间件(用于日志追踪)
- @app.middleware("http")
- async def add_request_id(request: Request, call_next):
- """为每个请求添加唯一ID"""
- import uuid
- request_id = str(uuid.uuid4())[:8]
- request.state.request_id = request_id
-
- response = await call_next(request)
- response.headers["X-Request-ID"] = request_id
- return response
-
-
- # 3. 性能监控中间件
- @app.middleware("http")
- async def performance_middleware(request: Request, call_next):
- """记录请求处理时间"""
- start_time = time.time()
-
- try:
- response = await call_next(request)
- process_time = time.time() - start_time
-
- # 记录慢请求
- if process_time > 1.0: # 超过1秒
- logger.warning(
- f"🐌 慢请求: {request.method} {request.url.path} "
- f"耗时: {process_time:.3f}s "
- f"状态: {response.status_code} "
- f"请求ID: {getattr(request.state, 'request_id', 'unknown')}"
- )
- elif settings.DEBUG:
- logger.debug(
- f"⚡ 请求: {request.method} {request.url.path} "
- f"耗时: {process_time:.3f}s"
- )
-
- # 添加处理时间到响应头
- response.headers["X-Process-Time"] = str(process_time)
- return response
-
- except Exception as e:
- process_time = time.time() - start_time
- logger.error(
- f"🔥 请求异常: {request.method} {request.url.path} "
- f"耗时: {process_time:.3f}s "
- f"错误: {e}"
- )
- raise
-
-
- # 4. GZIP压缩中间件(提升性能)
- app.add_middleware(
- GZipMiddleware,
- minimum_size=1000, # 只压缩大于1KB的响应
- )
-
-
- # ==================== 路由注册 ====================
- def register_routers():
- """注册所有API路由"""
- routers = []
-
- # 尝试导入并注册路由模块
- try:
- from app.api.v1.auth import router as auth_router
- routers.append(("认证模块", auth_router))
- except ImportError as e:
- logger.warning(f"⚠️ 认证路由导入失败: {e}")
-
- try:
- from app.api.v1.users import router as users_router
- routers.append(("用户管理", users_router))
- except ImportError as e:
- logger.warning(f"⚠️ 用户路由导入失败: {e}")
-
- # 更多路由模块...
- # try:
- # from app.api.v1.admin import router as admin_router
- # routers.append(("管理后台", admin_router))
- # except ImportError:
- # pass
-
- # 注册所有路由
- for name, router in routers:
- app.include_router(router, prefix=settings.API_V1_PREFIX)
- logger.info(f"✅ 路由注册: {name}")
-
- logger.info(f"🎯 共注册 {len(routers)} 个路由模块")
-
-
- # 调用路由注册
- register_routers()
-
-
- # ==================== 系统级路由 ====================
- @app.get("/health", tags=["系统监控"])
- async def health_check():
- """
- 健康检查端点
-
- 返回服务健康状态,可用于K8s探针、负载均衡器健康检查等
- """
- from app.database import check_async_connection
-
- health_status = {
- "status": "healthy",
- "service": settings.PROJECT_NAME,
- "version": settings.VERSION,
- "timestamp": time.time(),
- "async": True,
- "debug": settings.DEBUG,
- }
-
- # 检查数据库连接
- try:
- db_healthy = await check_async_connection()
- health_status["database"] = "healthy" if db_healthy else "unhealthy"
- except Exception as e:
- health_status["database"] = f"error: {str(e)}"
-
- # 检查Redis连接(可选)
- # try:
- # from app.core.redis import check_redis_connection
- # redis_healthy = await check_redis_connection()
- # health_status["redis"] = "healthy" if redis_healthy else "unhealthy"
- # except ImportError:
- # health_status["redis"] = "not_configured"
- # except Exception as e:
- # health_status["redis"] = f"error: {str(e)}"
-
- # 如果有不健康的服务,返回503
- if "unhealthy" in health_status.values() or "error" in str(health_status.values()):
- return JSONResponse(
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
- content=health_status
- )
-
- return health_status
-
-
- @app.get("/", tags=["系统"])
- async def root():
- """
- 根端点
-
- 返回API基本信息和可用端点
- """
- # 获取已注册的路由信息
- routes_info = []
- for route in app.routes:
- if hasattr(route, "methods"):
- routes_info.append({
- "path": route.path,
- "methods": list(route.methods) if route.methods else [],
- "name": route.name or "",
- })
-
- return {
- "message": f"欢迎使用 {settings.PROJECT_NAME} API",
- "version": settings.VERSION,
- "description": "基于FastAPI的现代化异步API服务",
- "docs": "/docs" if settings.DEBUG else None,
- "redoc": "/redoc" if settings.DEBUG else None,
- "health_check": "/health",
- "async": True,
- "database": "SQLAlchemy 2.0 Async",
- "routes_count": len(routes_info),
- "quick_links": {
- "认证": f"{settings.API_V1_PREFIX}/auth/login",
- "用户信息": f"{settings.API_V1_PREFIX}/users/me",
- "API文档": "/docs" if settings.DEBUG else "未启用",
- }
- }
-
-
- @app.get("/metrics", tags=["系统监控"])
- async def metrics():
- """
- 监控指标端点(可用于Prometheus)
-
- 返回应用性能指标
- """
- import psutil
- import gc
-
- # 内存使用
- process = psutil.Process()
- mem_info = process.memory_info()
-
- # GC统计
- gc_counts = gc.get_count()
-
- return {
- "process": {
- "pid": process.pid,
- "cpu_percent": process.cpu_percent(),
- "memory_mb": mem_info.rss / 1024 / 1024,
- "threads": process.num_threads(),
- },
- "gc": {
- "generation0": gc_counts[0],
- "generation1": gc_counts[1],
- "generation2": gc_counts[2],
- },
- "python": {
- "version": sys.version,
- "implementation": sys.implementation.name,
- },
- "timestamp": time.time(),
- }
-
-
- @app.get("/config", tags=["系统"])
- async def show_config():
- """
- 显示当前配置(调试用)
-
- 注意:生产环境建议禁用此端点或限制访问
- """
- if not settings.DEBUG:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="仅调试模式下可用"
- )
-
- # 安全地显示配置(隐藏敏感信息)
- config_dict = {}
- for key in dir(settings):
- if not key.startswith("_"):
- value = getattr(settings, key)
-
- # 隐藏敏感信息
- if any(sensitive in key.lower() for sensitive in ["secret", "key", "password", "token"]):
- config_dict[key] = "***HIDDEN***"
- elif "url" in key.lower() and "database" in key.lower():
- # 显示数据库类型但不显示完整URL
- db_type = value.split("://")[0] if "://" in value else "unknown"
- config_dict[key] = f"{db_type}://***HIDDEN***"
- else:
- config_dict[key] = value
-
- return {
- "config": config_dict,
- "environment": settings.ENVIRONMENT,
- "debug": settings.DEBUG,
- }
-
-
- # ==================== 启动入口 ====================
- def run_server():
- """启动服务器(开发模式)"""
- import uvicorn
-
- logger.info("=" * 50)
- logger.info(f"🚀 启动 {settings.PROJECT_NAME} v{settings.VERSION}")
- logger.info(f"🐛 调试模式: {settings.DEBUG}")
- logger.info(f"🌐 主机: 0.0.0.0")
- logger.info(f"🔌 端口: {settings.APP_RUN_PORT}")
- logger.info(f"🗄️ 数据库: {settings.DATABASE_URL.split('://')[0] if '://' in settings.DATABASE_URL else '未知'}")
- logger.info("=" * 50)
-
- uvicorn.run(
- app,
- host="0.0.0.0",
- port=settings.APP_RUN_PORT,
- reload=settings.DEBUG,
- log_level="info" if settings.DEBUG else "warning",
- access_log=True,
- use_colors=True,
- # 优化性能的配置
- limit_concurrency=1000,
- limit_max_requests=10000,
- timeout_keep_alive=5,
- )
-
-
- if __name__ == "__main__":
- run_server()
|