| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- # app/core/security.py
- import hashlib
- import secrets
- import base64
- import hmac
- import time
- import json
- from typing import Optional, Dict, Any, Tuple
- from datetime import datetime, timezone, timedelta
- import re
-
- # 密码哈希工具
- class PasswordHasher:
- """密码哈希和验证工具(使用 PBKDF2)"""
-
- @staticmethod
- def hash_password(password: str) -> str:
- """哈希密码"""
- # 生成随机盐(16字节)
- salt = secrets.token_bytes(16)
-
- # 使用 PBKDF2-HMAC-SHA256
- dk = hashlib.pbkdf2_hmac(
- 'sha256',
- password.encode('utf-8'),
- salt,
- 100000 # 迭代次数
- )
-
- # 组合 salt + hash,然后 base64 编码
- combined = salt + dk
- return base64.b64encode(combined).decode('utf-8')
-
- @staticmethod
- def verify_password(password: str, hashed_password: str) -> bool:
- """验证密码"""
- try:
- # 解码 base64
- decoded = base64.b64decode(hashed_password.encode('utf-8'))
-
- # 提取 salt (前16字节) 和存储的 hash
- salt = decoded[:16]
- stored_hash = decoded[16:]
-
- # 用相同的盐计算输入密码的 hash
- dk = hashlib.pbkdf2_hmac(
- 'sha256',
- password.encode('utf-8'),
- salt,
- 100000
- )
-
- # 使用常量时间比较防止时序攻击
- return secrets.compare_digest(dk, stored_hash)
-
- except Exception:
- return False
-
- # JWT 管理器
- class JWTManager:
- """JWT 令牌管理工具"""
-
- def __init__(self, secret_key: str, algorithm: str = "HS256"):
- self.secret_key = secret_key
- self.algorithm = algorithm
-
- def create_access_token(
- self,
- data: Dict[str, Any],
- expires_delta: Optional[timedelta] = None,
- expires_minutes: Optional[int] = None
- ) -> str:
- """创建访问令牌
-
- Args:
- data: 要编码的数据
- expires_delta: 过期时间差
- expires_minutes: 过期分钟数
-
- Returns:
- str: JWT 令牌
- """
- return self.create_token(data, expires_delta, expires_minutes, token_type="access")
-
- def create_refresh_token(
- self,
- data: Dict[str, Any],
- expires_delta: Optional[timedelta] = None,
- expires_days: int = 7
- ) -> str:
- """创建刷新令牌
-
- Args:
- data: 要编码的数据
- expires_delta: 过期时间差
- expires_days: 过期天数
-
- Returns:
- str: JWT 刷新令牌
- """
- if expires_delta is None:
- expires_delta = timedelta(days=expires_days)
- return self.create_token(data, expires_delta, token_type="refresh")
-
- def create_verification_token(
- self,
- data: Dict[str, Any],
- expires_delta: Optional[timedelta] = None,
- expires_hours: int = 24
- ) -> str:
- """创建验证令牌(用于邮箱验证等)
-
- Args:
- data: 要编码的数据
- expires_delta: 过期时间差
- expires_hours: 过期小时数
-
- Returns:
- str: JWT 验证令牌
- """
- if expires_delta is None:
- expires_delta = timedelta(hours=expires_hours)
- return self.create_token(data, expires_delta, token_type="verify")
-
- def create_reset_token(
- self,
- data: Dict[str, Any],
- expires_delta: Optional[timedelta] = None,
- expires_minutes: int = 30
- ) -> str:
- """创建密码重置令牌
-
- Args:
- data: 要编码的数据
- expires_delta: 过期时间差
- expires_minutes: 过期分钟数
-
- Returns:
- str: JWT 重置令牌
- """
- if expires_delta is None:
- expires_delta = timedelta(minutes=expires_minutes)
- return self.create_token(data, expires_delta, token_type="reset")
-
- def create_token(
- self,
- data: Dict[str, Any],
- expires_delta: Optional[timedelta] = None,
- expires_minutes: Optional[int] = None,
- token_type: str = "access"
- ) -> str:
- """创建 JWT 令牌(通用方法)
-
- Args:
- data: 要编码的数据
- expires_delta: 过期时间差
- expires_minutes: 过期分钟数
- token_type: 令牌类型
-
- Returns:
- str: JWT 令牌
- """
- # 复制数据以避免修改原始数据
- payload = data.copy()
-
- # 设置过期时间
- if expires_delta:
- expire = datetime.now(timezone.utc) + expires_delta
- elif expires_minutes:
- expire = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)
- else:
- expire = datetime.now(timezone.utc) + timedelta(minutes=30) # 默认30分钟
-
- # 添加标准声明
- payload.update({
- "exp": int(expire.timestamp()),
- "iat": int(datetime.now(timezone.utc).timestamp()),
- "iss": "caiyouhui-api",
- "type": token_type
- })
-
- # 编码 header 和 payload
- header = json.dumps({"alg": self.algorithm, "typ": "JWT"})
- payload_str = json.dumps(payload)
-
- header_b64 = base64.urlsafe_b64encode(header.encode()).decode().rstrip('=')
- payload_b64 = base64.urlsafe_b64encode(payload_str.encode()).decode().rstrip('=')
-
- # 创建签名
- message = f"{header_b64}.{payload_b64}"
- signature = hmac.new(
- self.secret_key.encode(),
- message.encode(),
- hashlib.sha256
- ).digest()
- signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip('=')
-
- return f"{header_b64}.{payload_b64}.{signature_b64}"
-
- def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
- """验证 JWT 令牌
-
- Args:
- token: JWT 令牌
-
- Returns:
- Optional[Dict]: 解码后的数据,如果令牌无效则返回 None
- """
- try:
- parts = token.split('.')
- if len(parts) != 3:
- return None
-
- header_b64, payload_b64, signature_b64 = parts
-
- # 验证签名
- message = f"{header_b64}.{payload_b64}"
- expected_signature = hmac.new(
- self.secret_key.encode(),
- message.encode(),
- hashlib.sha256
- ).digest()
- expected_signature_b64 = base64.urlsafe_b64encode(expected_signature).decode().rstrip('=')
-
- # 使用常量时间比较
- if not secrets.compare_digest(signature_b64, expected_signature_b64):
- return None
-
- # 解码 payload
- payload_json = base64.urlsafe_b64decode(payload_b64 + '=' * (4 - len(payload_b64) % 4)).decode()
- payload = json.loads(payload_json)
-
- # 检查过期时间
- if 'exp' in payload and payload['exp'] < int(time.time()):
- return None
-
- return payload
-
- except Exception:
- return None
-
- def decode_token(self, token: str) -> Optional[Dict[str, Any]]:
- """解码令牌(不验证签名,用于调试)
-
- Args:
- token: JWT 令牌
-
- Returns:
- Optional[Dict]: 解码后的数据
- """
- try:
- parts = token.split('.')
- if len(parts) != 3:
- return None
-
- _, payload_b64, _ = parts
-
- # 解码 payload
- payload_json = base64.urlsafe_b64decode(payload_b64 + '=' * (4 - len(payload_b64) % 4)).decode()
- return json.loads(payload_json)
-
- except Exception:
- return None
-
- # 密码验证工具
- class PasswordValidator:
- """密码强度验证工具"""
-
- @staticmethod
- def validate_password_strength(password: str) -> Tuple[bool, str]:
- """验证密码强度"""
- # 检查最小长度
- if len(password) < 8:
- return False, "密码必须至少8个字符"
-
- # 检查最大长度
- if len(password) > 100:
- return False, "密码不能超过100个字符"
-
- # 检查是否包含大写字母
- if not re.search(r'[A-Z]', password):
- return False, "密码必须包含至少一个大写字母"
-
- # 检查是否包含小写字母
- if not re.search(r'[a-z]', password):
- return False, "密码必须包含至少一个小写字母"
-
- # 检查是否包含数字
- if not re.search(r'\d', password):
- return False, "密码必须包含至少一个数字"
-
- # 检查是否包含特殊字符
- if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
- return False, "密码必须包含至少一个特殊字符"
-
- return True, "密码强度足够"
-
- # 令牌工具
- class TokenUtils:
- """令牌相关工具"""
-
- @staticmethod
- def generate_verification_code(length: int = 6) -> str:
- """生成数字验证码"""
- digits = '0123456789'
- return ''.join(secrets.choice(digits) for _ in range(length))
-
- @staticmethod
- def generate_reset_token(length: int = 32) -> str:
- """生成密码重置令牌"""
- return secrets.token_urlsafe(length)
-
- @staticmethod
- def generate_api_key(length: int = 32) -> str:
- """生成 API 密钥"""
- import string
- alphabet = string.ascii_letters + string.digits
- return ''.join(secrets.choice(alphabet) for _ in range(length))
-
- # 导出的便捷函数
- def create_access_token(
- data: Dict[str, Any],
- secret_key: str,
- expires_delta: Optional[timedelta] = None,
- expires_minutes: int = 30,
- algorithm: str = "HS256"
- ) -> str:
- """创建访问令牌(便捷函数)
-
- Args:
- data: 要编码的数据
- secret_key: 密钥
- expires_delta: 过期时间差
- expires_minutes: 过期分钟数
- algorithm: 算法
-
- Returns:
- str: JWT 令牌
- """
- jwt_manager = JWTManager(secret_key, algorithm)
- return jwt_manager.create_access_token(data, expires_delta, expires_minutes)
-
- def verify_access_token(token: str, secret_key: str, algorithm: str = "HS256") -> Optional[Dict[str, Any]]:
- """验证访问令牌(便捷函数)
-
- Args:
- token: JWT 令牌
- secret_key: 密钥
- algorithm: 算法
-
- Returns:
- Optional[Dict]: 解码后的数据
- """
- jwt_manager = JWTManager(secret_key, algorithm)
- return jwt_manager.verify_token(token)
-
- def get_password_hash(password: str) -> str:
- """哈希密码(便捷函数)
-
- Args:
- password: 明文密码
-
- Returns:
- str: 哈希后的密码
- """
- return PasswordHasher.hash_password(password)
-
- # 创建全局实例
- password_hasher = PasswordHasher()
- password_validator = PasswordValidator()
- token_utils = TokenUtils()
-
- # 导出函数(保持向后兼容)
- verify_password = password_hasher.verify_password
-
- # 导出的函数和类
- __all__ = [
- # 便捷函数
- "create_access_token",
- "verify_access_token",
- "verify_password",
- "get_password_hash",
-
- # 类
- "PasswordHasher",
- "JWTManager",
- "PasswordValidator",
- "TokenUtils",
-
- # 实例
- "password_hasher",
- "password_validator",
- "token_utils",
- ]
|