CaiYouHui后端fastapi实现

auth.py 3.3KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from passlib.context import CryptContext
  2. from jose import JWTError, jwt
  3. from datetime import datetime, timedelta
  4. from typing import Optional, Dict, Any, Union
  5. import secrets
  6. import string
  7. import re
  8. from ..config import settings
  9. # 密码上下文
  10. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  11. def verify_password(plain_password: str, hashed_password: str) -> bool:
  12. """验证密码"""
  13. return pwd_context.verify(plain_password, hashed_password)
  14. def get_password_hash(password: str) -> str:
  15. """生成密码哈希"""
  16. return pwd_context.hash(password)
  17. def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
  18. """创建访问令牌"""
  19. to_encode = data.copy()
  20. if expires_delta:
  21. expire = datetime.utcnow() + expires_delta
  22. else:
  23. expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  24. to_encode.update({"exp": expire, "type": "access"})
  25. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  26. return encoded_jwt
  27. def create_refresh_token(data: dict) -> str:
  28. """创建刷新令牌"""
  29. to_encode = data.copy()
  30. expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
  31. to_encode.update({"exp": expire, "type": "refresh"})
  32. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  33. return encoded_jwt
  34. def create_verification_token(email: str) -> str:
  35. """创建验证令牌"""
  36. to_encode = {"email": email, "type": "verify"}
  37. expire = datetime.utcnow() + timedelta(hours=settings.VERIFICATION_TOKEN_EXPIRE_HOURS)
  38. to_encode.update({"exp": expire})
  39. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  40. return encoded_jwt
  41. def create_reset_token(email: str) -> str:
  42. """创建密码重置令牌"""
  43. to_encode = {"email": email, "type": "reset"}
  44. expire = datetime.utcnow() + timedelta(minutes=settings.RESET_TOKEN_EXPIRE_MINUTES)
  45. to_encode.update({"exp": expire})
  46. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  47. return encoded_jwt
  48. def decode_token(token: str) -> Optional[Dict[str, Any]]:
  49. """解码令牌"""
  50. try:
  51. payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  52. return payload
  53. except JWTError:
  54. return None
  55. def generate_verification_code(length: int = 6) -> str:
  56. """生成验证码"""
  57. digits = string.digits
  58. return ''.join(secrets.choice(digits) for _ in range(length))
  59. def validate_password_strength(password: str) -> tuple[bool, str]:
  60. """验证密码强度"""
  61. if len(password) < 8:
  62. return False, "Password must be at least 8 characters long"
  63. if not re.search(r"[A-Z]", password):
  64. return False, "Password must contain at least one uppercase letter"
  65. if not re.search(r"[a-z]", password):
  66. return False, "Password must contain at least one lowercase letter"
  67. if not re.search(r"\d", password):
  68. return False, "Password must contain at least one digit"
  69. if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
  70. return False, "Password must contain at least one special character"
  71. return True, "Password is strong"