| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from sqlalchemy.ext.asyncio import AsyncSession
- from jose import JWTError, jwt
- from typing import Optional
-
- from ..database import get_async_db
- from ..models.user import User
- from ..schemas.token import TokenData
- from ..config import settings
-
- oauth2_scheme = OAuth2PasswordBearer(
- tokenUrl=f"{settings.API_V1_PREFIX}/auth/login",
- auto_error=False
- )
-
- async def get_current_user(
- token: Optional[str] = Depends(oauth2_scheme),
- db: AsyncSession = Depends(get_async_db)
- ) -> Optional[User]:
- """获取当前用户"""
- if not token:
- print("dfsfdsfdfdsfd")
- return None
-
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- try:
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- username: str = payload.get("sub")
- token_type: str = payload.get("type")
-
- if username is None or token_type != "access":
- raise credentials_exception
-
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
-
- user = db.query(User).filter(
- User.username == token_data.username,
- User.is_active == True
- ).first()
-
- if user is None:
- raise credentials_exception
-
- return user
-
- async def get_current_active_user(
- current_user: User = Depends(get_current_user)
- ) -> User:
- """获取当前活跃用户"""
- if not current_user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Not authenticated"
- )
-
- if not current_user.is_active:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Inactive user"
- )
-
- return current_user
-
- async def get_current_superuser(
- current_user: User = Depends(get_current_user)
- ) -> User:
- """获取超级用户"""
- if not current_user or not current_user.is_superuser:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not enough permissions"
- )
-
- return current_user
-
- def require_auth(current_user: Optional[User] = Depends(get_current_user)) -> User:
- """要求认证的依赖"""
- if not current_user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Not authenticated"
- )
- return current_user
-
- # 权限检查装饰器
- def require_permission(permission: str):
- """权限检查装饰器"""
- def permission_dependency(
- current_user: User = Depends(get_current_active_user)
- ) -> User:
- # 这里实现具体的权限检查逻辑
- # 可以从数据库或缓存中获取用户权限
- if not current_user.is_superuser:
- # 检查用户是否有特定权限
- user_permissions = [] # 从数据库获取
- if permission not in user_permissions:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Insufficient permissions"
- )
- return current_user
- return permission_dependency
|