All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 6s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 6s
Implemented backend and frontend support for managing SMTP settings for email notifications. Includes API endpoints, database migration, and UI integration for configuring and testing email alerts.
125 lines
4.7 KiB
Python
125 lines
4.7 KiB
Python
from email.message import EmailMessage
|
|
import smtplib
|
|
import ssl
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.db import get_db
|
|
from app.core.deps import require_roles
|
|
from app.models.models import EmailNotificationSettings, User
|
|
from app.schemas.admin_settings import EmailSettingsOut, EmailSettingsTestRequest, EmailSettingsUpdate
|
|
from app.services.audit import write_audit_log
|
|
from app.services.crypto import decrypt_secret, encrypt_secret
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def _get_or_create_settings(db: AsyncSession) -> EmailNotificationSettings:
|
|
settings = await db.scalar(select(EmailNotificationSettings).limit(1))
|
|
if settings:
|
|
return settings
|
|
settings = EmailNotificationSettings()
|
|
db.add(settings)
|
|
await db.commit()
|
|
await db.refresh(settings)
|
|
return settings
|
|
|
|
|
|
def _to_out(settings: EmailNotificationSettings) -> EmailSettingsOut:
|
|
recipients = settings.alert_recipients if isinstance(settings.alert_recipients, list) else []
|
|
return EmailSettingsOut(
|
|
enabled=settings.enabled,
|
|
smtp_host=settings.smtp_host,
|
|
smtp_port=settings.smtp_port,
|
|
smtp_username=settings.smtp_username,
|
|
from_email=settings.from_email,
|
|
use_starttls=settings.use_starttls,
|
|
use_ssl=settings.use_ssl,
|
|
alert_recipients=recipients,
|
|
has_password=bool(settings.encrypted_smtp_password),
|
|
updated_at=settings.updated_at,
|
|
)
|
|
|
|
|
|
@router.get("/email", response_model=EmailSettingsOut)
|
|
async def get_email_settings(
|
|
admin: User = Depends(require_roles("admin")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> EmailSettingsOut:
|
|
_ = admin
|
|
settings = await _get_or_create_settings(db)
|
|
return _to_out(settings)
|
|
|
|
|
|
@router.put("/email", response_model=EmailSettingsOut)
|
|
async def update_email_settings(
|
|
payload: EmailSettingsUpdate,
|
|
admin: User = Depends(require_roles("admin")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> EmailSettingsOut:
|
|
settings = await _get_or_create_settings(db)
|
|
settings.enabled = payload.enabled
|
|
settings.smtp_host = payload.smtp_host.strip() if payload.smtp_host else None
|
|
settings.smtp_port = payload.smtp_port
|
|
settings.smtp_username = payload.smtp_username.strip() if payload.smtp_username else None
|
|
settings.from_email = str(payload.from_email) if payload.from_email else None
|
|
settings.use_starttls = payload.use_starttls
|
|
settings.use_ssl = payload.use_ssl
|
|
settings.alert_recipients = [str(item) for item in payload.alert_recipients]
|
|
|
|
if payload.clear_smtp_password:
|
|
settings.encrypted_smtp_password = None
|
|
elif payload.smtp_password:
|
|
settings.encrypted_smtp_password = encrypt_secret(payload.smtp_password)
|
|
|
|
await db.commit()
|
|
await db.refresh(settings)
|
|
await write_audit_log(db, "admin.email_settings.update", admin.id, {"enabled": settings.enabled})
|
|
return _to_out(settings)
|
|
|
|
|
|
@router.post("/email/test")
|
|
async def test_email_settings(
|
|
payload: EmailSettingsTestRequest,
|
|
admin: User = Depends(require_roles("admin")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> dict:
|
|
settings = await _get_or_create_settings(db)
|
|
if not settings.smtp_host:
|
|
raise HTTPException(status_code=400, detail="SMTP host is not configured")
|
|
if not settings.from_email:
|
|
raise HTTPException(status_code=400, detail="From email is not configured")
|
|
|
|
password = decrypt_secret(settings.encrypted_smtp_password) if settings.encrypted_smtp_password else None
|
|
message = EmailMessage()
|
|
message["From"] = settings.from_email
|
|
message["To"] = str(payload.recipient)
|
|
message["Subject"] = payload.subject
|
|
message.set_content(payload.message)
|
|
|
|
try:
|
|
if settings.use_ssl:
|
|
with smtplib.SMTP_SSL(
|
|
settings.smtp_host,
|
|
settings.smtp_port,
|
|
timeout=10,
|
|
context=ssl.create_default_context(),
|
|
) as smtp:
|
|
if settings.smtp_username:
|
|
smtp.login(settings.smtp_username, password or "")
|
|
smtp.send_message(message)
|
|
else:
|
|
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=10) as smtp:
|
|
if settings.use_starttls:
|
|
smtp.starttls(context=ssl.create_default_context())
|
|
if settings.smtp_username:
|
|
smtp.login(settings.smtp_username, password or "")
|
|
smtp.send_message(message)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=f"SMTP test failed: {exc}")
|
|
|
|
await write_audit_log(db, "admin.email_settings.test", admin.id, {"recipient": str(payload.recipient)})
|
|
return {"status": "sent", "recipient": str(payload.recipient)}
|