Files
NexaPG/backend/app/services/alert_notifications.py
nessi c437e72c2b
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
Add customizable email templates and remove alert recipients
Replaced the fixed `alert_recipients` list with customizable subject and body templates for alerts and warnings. This allows for more flexible and dynamic email notifications using placeholder variables. Updated relevant backend and frontend components to support this feature.
2026-02-12 16:32:53 +01:00

197 lines
6.9 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
from email.message import EmailMessage
from email.utils import formataddr
import smtplib
import ssl
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.models import AlertNotificationEvent, EmailNotificationSettings, TargetOwner, User
from app.schemas.alert import AlertStatusResponse
from app.services.crypto import decrypt_secret
_NOTIFICATION_COOLDOWN = timedelta(minutes=30)
async def _smtp_send(
host: str,
port: int,
username: str | None,
password: str | None,
from_name: str | None,
from_email: str,
recipient: str,
subject: str,
body: str,
use_starttls: bool,
use_ssl: bool,
) -> None:
def _send() -> None:
message = EmailMessage()
message["From"] = formataddr((from_name, from_email)) if from_name else from_email
message["To"] = recipient
message["Subject"] = subject
message.set_content(body)
if use_ssl:
with smtplib.SMTP_SSL(host, port, timeout=10, context=ssl.create_default_context()) as smtp:
if username:
smtp.login(username, password or "")
smtp.send_message(message)
return
with smtplib.SMTP(host, port, timeout=10) as smtp:
if use_starttls:
smtp.starttls(context=ssl.create_default_context())
if username:
smtp.login(username, password or "")
smtp.send_message(message)
await asyncio.to_thread(_send)
def _render_subject(item) -> str:
sev = item.severity.upper()
return f"[NexaPG][{sev}] {item.target_name} - {item.name}"
def _render_body(item) -> str:
lines = [
f"Severity: {item.severity}",
f"Target: {item.target_name} (id={item.target_id})",
f"Alert: {item.name}",
f"Category: {item.category}",
f"Checked At: {item.checked_at.isoformat()}",
"",
f"Description: {item.description}",
f"Message: {item.message}",
]
if item.value is not None:
lines.append(f"Current Value: {item.value}")
if item.warning_threshold is not None:
lines.append(f"Warning Threshold: {item.warning_threshold}")
if item.alert_threshold is not None:
lines.append(f"Alert Threshold: {item.alert_threshold}")
lines.append("")
lines.append(f"Alert Key: {item.alert_key}")
lines.append("Sent by NexaPG notification service.")
return "\n".join(lines)
def _template_context(item) -> dict[str, str]:
return {
"severity": str(item.severity),
"target_name": str(item.target_name),
"target_id": str(item.target_id),
"alert_name": str(item.name),
"category": str(item.category),
"description": str(item.description),
"message": str(item.message),
"value": "" if item.value is None else str(item.value),
"warning_threshold": "" if item.warning_threshold is None else str(item.warning_threshold),
"alert_threshold": "" if item.alert_threshold is None else str(item.alert_threshold),
"checked_at": item.checked_at.isoformat() if item.checked_at else "",
"alert_key": str(item.alert_key),
}
def _safe_format(template: str | None, context: dict[str, str], fallback: str) -> str:
if not template:
return fallback
rendered = template
for key, value in context.items():
rendered = rendered.replace("{" + key + "}", value)
return rendered
async def process_target_owner_notifications(db: AsyncSession, status: AlertStatusResponse) -> None:
settings = await db.scalar(select(EmailNotificationSettings).limit(1))
if not settings or not settings.enabled:
return
if not settings.smtp_host or not settings.from_email:
return
password = decrypt_secret(settings.encrypted_smtp_password) if settings.encrypted_smtp_password else None
now = datetime.now(timezone.utc)
active_items = status.alerts + status.warnings
if not active_items:
return
target_ids = sorted({item.target_id for item in active_items})
owner_rows = (
await db.execute(
select(TargetOwner.target_id, User.email)
.join(User, User.id == TargetOwner.user_id)
.where(TargetOwner.target_id.in_(target_ids))
)
).all()
owners_map: dict[int, set[str]] = {}
for target_id, email in owner_rows:
owners_map.setdefault(target_id, set()).add(email)
existing_rows = (
await db.scalars(
select(AlertNotificationEvent).where(
AlertNotificationEvent.target_id.in_(target_ids)
)
)
).all()
event_map = {(row.alert_key, row.target_id, row.severity): row for row in existing_rows}
for item in active_items:
recipients = sorted(owners_map.get(item.target_id, set()))
if not recipients:
continue
key = (item.alert_key, item.target_id, item.severity)
existing = event_map.get(key)
should_send = existing is None or (now - existing.last_sent_at) >= _NOTIFICATION_COOLDOWN
if should_send:
fallback_subject = _render_subject(item)
fallback_body = _render_body(item)
context = _template_context(item)
if item.severity == "alert":
subject = _safe_format(settings.alert_subject_template, context, fallback_subject)
body = _safe_format(settings.alert_body_template, context, fallback_body)
else:
subject = _safe_format(settings.warning_subject_template, context, fallback_subject)
body = _safe_format(settings.warning_body_template, context, fallback_body)
for recipient in recipients:
try:
await _smtp_send(
host=settings.smtp_host,
port=settings.smtp_port,
username=settings.smtp_username,
password=password,
from_name=settings.from_name,
from_email=settings.from_email,
recipient=recipient,
subject=subject,
body=body,
use_starttls=settings.use_starttls,
use_ssl=settings.use_ssl,
)
except Exception:
continue
if existing:
existing.last_seen_at = now
if should_send:
existing.last_sent_at = now
else:
db.add(
AlertNotificationEvent(
alert_key=item.alert_key,
target_id=item.target_id,
severity=item.severity,
last_seen_at=now,
last_sent_at=now if should_send else now,
)
)
await db.commit()