| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import secrets
- from fastapi import BackgroundTasks
- from typing import Optional
- import smtplib
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from jinja2 import Template
- import aiosmtplib
- from ..config import settings
- import os
- import string
-
- class EmailService:
- def __init__(self):
- self.smtp_host = settings.SMTP_HOST
- self.smtp_port = settings.SMTP_PORT
- self.smtp_user = settings.SMTP_USER
- self.smtp_password = settings.SMTP_PASSWORD
- self.from_email = settings.EMAILS_FROM_EMAIL
- self.from_name = settings.EMAILS_FROM_NAME
-
- async def send_email_async(
- self,
- email_to: str,
- subject: str,
- html_content: str,
- text_content: Optional[str] = None
- ) -> bool:
- """异步发送邮件"""
- message = MIMEMultipart("alternative")
- message["Subject"] = subject
- message["From"] = f"{self.from_name} <{self.from_email}>"
- message["To"] = email_to
-
- # 添加纯文本版本
- if text_content:
- part1 = MIMEText(text_content, "plain")
- message.attach(part1)
-
- # 添加HTML版本
- part2 = MIMEText(html_content, "html")
- message.attach(part2)
-
- try:
- await aiosmtplib.send(
- message,
- hostname=self.smtp_host,
- port=self.smtp_port,
- username=self.smtp_user,
- password=self.smtp_password,
- use_tls=True,
- )
- return True
- except Exception as e:
- print(f"Error sending email: {e}")
- return False
-
- def send_email_sync(
- self,
- email_to: str,
- subject: str,
- html_content: str,
- text_content: Optional[str] = None
- ) -> bool:
- """同步发送邮件"""
- message = MIMEMultipart("alternative")
- message["Subject"] = subject
- message["From"] = f"{self.from_name} <{self.from_email}>"
- message["To"] = email_to
-
- if text_content:
- part1 = MIMEText(text_content, "plain")
- message.attach(part1)
-
- part2 = MIMEText(html_content, "html")
- message.attach(part2)
-
- try:
- with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
- server.starttls()
- server.login(self.smtp_user, self.smtp_password)
- server.send_message(message)
- return True
- except Exception as e:
- print(f"Error sending email: {e}")
- return False
-
- async def send_verification_email(
- self,
- email_to: str,
- username: str,
- verification_url: str,
- verification_code: Optional[str] = None
- ) -> bool:
- """发送验证邮件"""
- # 加载模板
- template_path = os.path.join("templates", "email", "verify_email.html")
- with open(template_path, "r") as f:
- template_str = f.read()
-
- template = Template(template_str)
- html_content = template.render(
- username=username,
- verification_url=verification_url,
- verification_code=verification_code,
- frontend_url=settings.FRONTEND_URL
- )
-
- subject = "Verify Your Email Address"
- text_content = f"Hello {username},\n\nPlease verify your email by clicking: {verification_url}"
-
- if verification_code:
- text_content += f"\n\nOr use this verification code: {verification_code}"
-
- return await self.send_email_async(email_to, subject, html_content, text_content)
-
- async def send_password_reset_email(
- self,
- email_to: str,
- username: str,
- reset_url: str
- ) -> bool:
- """发送密码重置邮件"""
- template_path = os.path.join("templates", "email", "reset_password.html")
- with open(template_path, "r") as f:
- template_str = f.read()
-
- template = Template(template_str)
- html_content = template.render(
- username=username,
- reset_url=reset_url,
- frontend_url=settings.FRONTEND_URL
- )
-
- subject = "Password Reset Request"
- text_content = f"Hello {username},\n\nClick here to reset your password: {reset_url}"
-
- return await self.send_email_async(email_to, subject, html_content, text_content)
-
- async def send_welcome_email(
- self,
- email_to: str,
- username: str
- ) -> bool:
- """发送欢迎邮件"""
- template_path = os.path.join("templates", "email", "welcome.html")
- with open(template_path, "r") as f:
- template_str = f.read()
-
- template = Template(template_str)
- html_content = template.render(
- username=username,
- frontend_url=settings.FRONTEND_URL
- )
-
- subject = "Welcome to Our Platform!"
- text_content = f"Welcome {username}! We're glad to have you on board."
-
- return await self.send_email_async(email_to, subject, html_content, text_content)
-
- def generate_random_string(self, length: int = 6) -> str:
- """生成随机数字加字母字符串"""
- name = string.digits + string.ascii_letters
- return ''.join(secrets.choice(name) for _ in range(length))
-
- email_service = EmailService()
|