# app/core/security.py import hashlib import secrets import base64 import hmac import time import json from typing import Optional, Dict, Any, Tuple from datetime import datetime, timezone, timedelta import re # 密码哈希工具 class PasswordHasher: """密码哈希和验证工具(使用 PBKDF2)""" @staticmethod def hash_password(password: str) -> str: """哈希密码""" # 生成随机盐(16字节) salt = secrets.token_bytes(16) # 使用 PBKDF2-HMAC-SHA256 dk = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt, 100000 # 迭代次数 ) # 组合 salt + hash,然后 base64 编码 combined = salt + dk return base64.b64encode(combined).decode('utf-8') @staticmethod def verify_password(password: str, hashed_password: str) -> bool: """验证密码""" try: # 解码 base64 decoded = base64.b64decode(hashed_password.encode('utf-8')) # 提取 salt (前16字节) 和存储的 hash salt = decoded[:16] stored_hash = decoded[16:] # 用相同的盐计算输入密码的 hash dk = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt, 100000 ) # 使用常量时间比较防止时序攻击 return secrets.compare_digest(dk, stored_hash) except Exception: return False # JWT 管理器 class JWTManager: """JWT 令牌管理工具""" def __init__(self, secret_key: str, algorithm: str = "HS256"): self.secret_key = secret_key self.algorithm = algorithm def create_access_token( self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None, expires_minutes: Optional[int] = None ) -> str: """创建访问令牌 Args: data: 要编码的数据 expires_delta: 过期时间差 expires_minutes: 过期分钟数 Returns: str: JWT 令牌 """ return self.create_token(data, expires_delta, expires_minutes, token_type="access") def create_refresh_token( self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None, expires_days: int = 7 ) -> str: """创建刷新令牌 Args: data: 要编码的数据 expires_delta: 过期时间差 expires_days: 过期天数 Returns: str: JWT 刷新令牌 """ if expires_delta is None: expires_delta = timedelta(days=expires_days) return self.create_token(data, expires_delta, token_type="refresh") def create_verification_token( self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None, expires_hours: int = 24 ) -> str: """创建验证令牌(用于邮箱验证等) Args: data: 要编码的数据 expires_delta: 过期时间差 expires_hours: 过期小时数 Returns: str: JWT 验证令牌 """ if expires_delta is None: expires_delta = timedelta(hours=expires_hours) return self.create_token(data, expires_delta, token_type="verify") def create_reset_token( self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None, expires_minutes: int = 30 ) -> str: """创建密码重置令牌 Args: data: 要编码的数据 expires_delta: 过期时间差 expires_minutes: 过期分钟数 Returns: str: JWT 重置令牌 """ if expires_delta is None: expires_delta = timedelta(minutes=expires_minutes) return self.create_token(data, expires_delta, token_type="reset") def create_token( self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None, expires_minutes: Optional[int] = None, token_type: str = "access" ) -> str: """创建 JWT 令牌(通用方法) Args: data: 要编码的数据 expires_delta: 过期时间差 expires_minutes: 过期分钟数 token_type: 令牌类型 Returns: str: JWT 令牌 """ # 复制数据以避免修改原始数据 payload = data.copy() # 设置过期时间 if expires_delta: expire = datetime.now(timezone.utc) + expires_delta elif expires_minutes: expire = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes) else: expire = datetime.now(timezone.utc) + timedelta(minutes=30) # 默认30分钟 # 添加标准声明 payload.update({ "exp": int(expire.timestamp()), "iat": int(datetime.now(timezone.utc).timestamp()), "iss": "caiyouhui-api", "type": token_type }) # 编码 header 和 payload header = json.dumps({"alg": self.algorithm, "typ": "JWT"}) payload_str = json.dumps(payload) header_b64 = base64.urlsafe_b64encode(header.encode()).decode().rstrip('=') payload_b64 = base64.urlsafe_b64encode(payload_str.encode()).decode().rstrip('=') # 创建签名 message = f"{header_b64}.{payload_b64}" signature = hmac.new( self.secret_key.encode(), message.encode(), hashlib.sha256 ).digest() signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip('=') return f"{header_b64}.{payload_b64}.{signature_b64}" def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """验证 JWT 令牌 Args: token: JWT 令牌 Returns: Optional[Dict]: 解码后的数据,如果令牌无效则返回 None """ try: parts = token.split('.') if len(parts) != 3: return None header_b64, payload_b64, signature_b64 = parts # 验证签名 message = f"{header_b64}.{payload_b64}" expected_signature = hmac.new( self.secret_key.encode(), message.encode(), hashlib.sha256 ).digest() expected_signature_b64 = base64.urlsafe_b64encode(expected_signature).decode().rstrip('=') # 使用常量时间比较 if not secrets.compare_digest(signature_b64, expected_signature_b64): return None # 解码 payload payload_json = base64.urlsafe_b64decode(payload_b64 + '=' * (4 - len(payload_b64) % 4)).decode() payload = json.loads(payload_json) # 检查过期时间 if 'exp' in payload and payload['exp'] < int(time.time()): return None return payload except Exception: return None def decode_token(self, token: str) -> Optional[Dict[str, Any]]: """解码令牌(不验证签名,用于调试) Args: token: JWT 令牌 Returns: Optional[Dict]: 解码后的数据 """ try: parts = token.split('.') if len(parts) != 3: return None _, payload_b64, _ = parts # 解码 payload payload_json = base64.urlsafe_b64decode(payload_b64 + '=' * (4 - len(payload_b64) % 4)).decode() return json.loads(payload_json) except Exception: return None # 密码验证工具 class PasswordValidator: """密码强度验证工具""" @staticmethod def validate_password_strength(password: str) -> Tuple[bool, str]: """验证密码强度""" # 检查最小长度 if len(password) < 8: return False, "密码必须至少8个字符" # 检查最大长度 if len(password) > 100: return False, "密码不能超过100个字符" # 检查是否包含大写字母 if not re.search(r'[A-Z]', password): return False, "密码必须包含至少一个大写字母" # 检查是否包含小写字母 if not re.search(r'[a-z]', password): return False, "密码必须包含至少一个小写字母" # 检查是否包含数字 if not re.search(r'\d', password): return False, "密码必须包含至少一个数字" # 检查是否包含特殊字符 if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): return False, "密码必须包含至少一个特殊字符" return True, "密码强度足够" # 令牌工具 class TokenUtils: """令牌相关工具""" @staticmethod def generate_verification_code(length: int = 6) -> str: """生成数字验证码""" digits = '0123456789' return ''.join(secrets.choice(digits) for _ in range(length)) @staticmethod def generate_reset_token(length: int = 32) -> str: """生成密码重置令牌""" return secrets.token_urlsafe(length) @staticmethod def generate_api_key(length: int = 32) -> str: """生成 API 密钥""" import string alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for _ in range(length)) # 导出的便捷函数 def create_access_token( data: Dict[str, Any], secret_key: str, expires_delta: Optional[timedelta] = None, expires_minutes: int = 30, algorithm: str = "HS256" ) -> str: """创建访问令牌(便捷函数) Args: data: 要编码的数据 secret_key: 密钥 expires_delta: 过期时间差 expires_minutes: 过期分钟数 algorithm: 算法 Returns: str: JWT 令牌 """ jwt_manager = JWTManager(secret_key, algorithm) return jwt_manager.create_access_token(data, expires_delta, expires_minutes) def verify_access_token(token: str, secret_key: str, algorithm: str = "HS256") -> Optional[Dict[str, Any]]: """验证访问令牌(便捷函数) Args: token: JWT 令牌 secret_key: 密钥 algorithm: 算法 Returns: Optional[Dict]: 解码后的数据 """ jwt_manager = JWTManager(secret_key, algorithm) return jwt_manager.verify_token(token) def get_password_hash(password: str) -> str: """哈希密码(便捷函数) Args: password: 明文密码 Returns: str: 哈希后的密码 """ return PasswordHasher.hash_password(password) # 创建全局实例 password_hasher = PasswordHasher() password_validator = PasswordValidator() token_utils = TokenUtils() # 导出函数(保持向后兼容) verify_password = password_hasher.verify_password # 导出的函数和类 __all__ = [ # 便捷函数 "create_access_token", "verify_access_token", "verify_password", "get_password_hash", # 类 "PasswordHasher", "JWTManager", "PasswordValidator", "TokenUtils", # 实例 "password_hasher", "password_validator", "token_utils", ]