# app/api/v1/users.py from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional from datetime import datetime, timezone, timedelta from app.schemas.user import UserProfile, UserUpdate, PasswordChange from app.database import get_async_db # 使用异步依赖 from app.services.user_service import UserService from app.core.security import password_validator # 依赖:获取当前用户 from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.core.security import verify_access_token from app.config import settings router = APIRouter(prefix="/users", tags=["用户管理"]) security = HTTPBearer() # ==================== 异步认证依赖 ==================== async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_async_db) # 异步Session ): """获取当前用户依赖(异步版本)""" token = credentials.credentials # 验证令牌 payload = verify_access_token(token, settings.SECRET_KEY) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的令牌" ) username = payload.get("sub") if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌无效" ) user_service = UserService(db) user = await user_service.get_user_by_username(username) # 使用await if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="用户账户已被禁用" ) return user # ==================== 用户个人资料相关 ==================== @router.get("/me", response_model=UserProfile) async def get_my_profile( current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) # 异步Session ): """获取我的资料""" # 直接从current_user获取,无需再查询数据库 return UserProfile( id=current_user.id, username=current_user.username, email=current_user.email, full_name=current_user.full_name, is_active=current_user.is_active, is_verified=current_user.is_verified, created_at=current_user.created_at, last_login=current_user.last_login, avatar=current_user.avatar ) @router.put("/me", response_model=UserProfile) async def update_my_profile( user_data: UserUpdate, current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """更新我的资料""" user_service = UserService(db) update_dict = user_data.dict(exclude_unset=True) # 如果更新邮箱,需要验证新邮箱是否已被使用 if user_data.email and user_data.email != current_user.email: existing_user = await user_service.get_user_by_email(user_data.email) if existing_user and existing_user.id != current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="邮箱已被使用" ) # 更新用户信息 updated_user = await user_service.update_user(current_user.id, update_dict) if not updated_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) return UserProfile( id=updated_user.id, username=updated_user.username, email=updated_user.email, full_name=updated_user.full_name, is_active=updated_user.is_active, is_verified=updated_user.is_verified, created_at=updated_user.created_at, last_login=updated_user.last_login, avatar=updated_user.avatar ) @router.post("/me/change-password") async def change_password( password_data: PasswordChange, current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """修改密码""" user_service = UserService(db) try: success = await user_service.change_password( current_user.id, password_data.current_password, password_data.new_password ) if not success: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="当前密码错误" ) return {"message": "密码修改成功"} except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) # ==================== 管理员操作 ==================== @router.get("/", response_model=List[UserProfile]) async def list_users( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=100), active_only: bool = Query(True), current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """获取用户列表(需要管理员权限)""" if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" ) user_service = UserService(db) users = await user_service.list_users(skip, limit, active_only) return [ UserProfile( id=user.id, username=user.username, email=user.email, full_name=user.full_name, is_active=user.is_active, is_verified=user.is_verified, created_at=user.created_at, last_login=user.last_login, avatar=user.avatar ) for user in users ] @router.get("/{user_id}", response_model=UserProfile) async def get_user( user_id: int, current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """获取单个用户信息""" user_service = UserService(db) # 普通用户只能查看自己的信息 if not current_user.is_superuser and user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只能查看自己的用户信息" ) # 管理员可以查看任何人,普通用户只能查看自己(前面已校验) user = await user_service.get_user_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) return UserProfile( id=user.id, username=user.username, email=user.email, full_name=user.full_name, is_active=user.is_active, is_verified=user.is_verified, created_at=user.created_at, last_login=user.last_login, avatar=user.avatar ) @router.delete("/{user_id}") async def delete_user( user_id: int, current_user = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """删除用户(需要管理员权限)""" if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" ) # 不能删除自己 if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="不能删除自己的账户" ) user_service = UserService(db) success = await user_service.delete_user(user_id) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) return {"message": "用户已禁用"} @router.get("/test") async def test_users(): """测试用户管理 API""" return { "message": "用户管理 API 正常运行(使用 SQLite 数据库异步版本)", "endpoints": { "GET /me": "获取我的资料", "PUT /me": "更新我的资料", "POST /me/change-password": "修改密码", "GET /": "获取用户列表(管理员)", "GET /{user_id}": "获取单个用户", "DELETE /{user_id}": "删除用户(管理员)" }, "async": True, "database": "异步SQLAlchemy 2.0" }