Files
NexaPG/backend/app/api/routes/admin_settings.py
nessi 117710cc0a [NX-101 Issue] Refactor error handling to use consistent API error format
Replaced all inline error messages with the standardized `api_error` helper for consistent error response formatting. This improves clarity, maintainability, and ensures uniform error structures across the application. Updated logging for collector failures to include error class and switched to warning level for target unreachable scenarios.
2026-02-14 11:30:56 +01:00

137 lines
5.7 KiB
Python

from email.message import EmailMessage
from email.utils import formataddr
import smtplib
import ssl
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
from app.core.deps import require_roles
from app.core.errors import api_error
from app.models.models import EmailNotificationSettings, User
from app.schemas.admin_settings import EmailSettingsOut, EmailSettingsTestRequest, EmailSettingsUpdate
from app.services.audit import write_audit_log
from app.services.crypto import decrypt_secret, encrypt_secret
router = APIRouter()
async def _get_or_create_settings(db: AsyncSession) -> EmailNotificationSettings:
settings = await db.scalar(select(EmailNotificationSettings).limit(1))
if settings:
return settings
settings = EmailNotificationSettings()
db.add(settings)
await db.commit()
await db.refresh(settings)
return settings
def _to_out(settings: EmailNotificationSettings) -> EmailSettingsOut:
return EmailSettingsOut(
enabled=settings.enabled,
smtp_host=settings.smtp_host,
smtp_port=settings.smtp_port,
smtp_username=settings.smtp_username,
from_name=settings.from_name,
from_email=settings.from_email,
use_starttls=settings.use_starttls,
use_ssl=settings.use_ssl,
warning_subject_template=settings.warning_subject_template,
alert_subject_template=settings.alert_subject_template,
warning_body_template=settings.warning_body_template,
alert_body_template=settings.alert_body_template,
has_password=bool(settings.encrypted_smtp_password),
updated_at=settings.updated_at,
)
@router.get("/email", response_model=EmailSettingsOut)
async def get_email_settings(
admin: User = Depends(require_roles("admin")),
db: AsyncSession = Depends(get_db),
) -> EmailSettingsOut:
_ = admin
settings = await _get_or_create_settings(db)
return _to_out(settings)
@router.put("/email", response_model=EmailSettingsOut)
async def update_email_settings(
payload: EmailSettingsUpdate,
admin: User = Depends(require_roles("admin")),
db: AsyncSession = Depends(get_db),
) -> EmailSettingsOut:
settings = await _get_or_create_settings(db)
settings.enabled = payload.enabled
settings.smtp_host = payload.smtp_host.strip() if payload.smtp_host else None
settings.smtp_port = payload.smtp_port
settings.smtp_username = payload.smtp_username.strip() if payload.smtp_username else None
settings.from_name = payload.from_name.strip() if payload.from_name else None
settings.from_email = str(payload.from_email) if payload.from_email else None
settings.use_starttls = payload.use_starttls
settings.use_ssl = payload.use_ssl
settings.warning_subject_template = payload.warning_subject_template.strip() if payload.warning_subject_template else None
settings.alert_subject_template = payload.alert_subject_template.strip() if payload.alert_subject_template else None
settings.warning_body_template = payload.warning_body_template.strip() if payload.warning_body_template else None
settings.alert_body_template = payload.alert_body_template.strip() if payload.alert_body_template else None
if payload.clear_smtp_password:
settings.encrypted_smtp_password = None
elif payload.smtp_password:
settings.encrypted_smtp_password = encrypt_secret(payload.smtp_password)
await db.commit()
await db.refresh(settings)
await write_audit_log(db, "admin.email_settings.update", admin.id, {"enabled": settings.enabled})
return _to_out(settings)
@router.post("/email/test")
async def test_email_settings(
payload: EmailSettingsTestRequest,
admin: User = Depends(require_roles("admin")),
db: AsyncSession = Depends(get_db),
) -> dict:
settings = await _get_or_create_settings(db)
if not settings.smtp_host:
raise HTTPException(status_code=400, detail=api_error("smtp_host_missing", "SMTP host is not configured"))
if not settings.from_email:
raise HTTPException(status_code=400, detail=api_error("smtp_from_email_missing", "From email is not configured"))
password = decrypt_secret(settings.encrypted_smtp_password) if settings.encrypted_smtp_password else None
message = EmailMessage()
message["From"] = formataddr((settings.from_name, settings.from_email)) if settings.from_name else settings.from_email
message["To"] = str(payload.recipient)
message["Subject"] = payload.subject
message.set_content(payload.message)
try:
if settings.use_ssl:
with smtplib.SMTP_SSL(
settings.smtp_host,
settings.smtp_port,
timeout=10,
context=ssl.create_default_context(),
) as smtp:
if settings.smtp_username:
smtp.login(settings.smtp_username, password or "")
smtp.send_message(message)
else:
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=10) as smtp:
if settings.use_starttls:
smtp.starttls(context=ssl.create_default_context())
if settings.smtp_username:
smtp.login(settings.smtp_username, password or "")
smtp.send_message(message)
except Exception as exc:
raise HTTPException(
status_code=400,
detail=api_error("smtp_test_failed", "SMTP test failed", {"error": str(exc)}),
) from exc
await write_audit_log(db, "admin.email_settings.test", admin.id, {"recipient": str(payload.recipient)})
return {"status": "sent", "recipient": str(payload.recipient)}