diff --git a/backend/alembic/versions/0005_target_owners_notifications.py b/backend/alembic/versions/0005_target_owners_notifications.py new file mode 100644 index 0000000..8563784 --- /dev/null +++ b/backend/alembic/versions/0005_target_owners_notifications.py @@ -0,0 +1,58 @@ +"""add target owners and alert notification events + +Revision ID: 0005_target_owners +Revises: 0004_email_settings +Create Date: 2026-02-12 +""" + +from alembic import op +import sqlalchemy as sa + + +revision = "0005_target_owners" +down_revision = "0004_email_settings" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "target_owners", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("target_id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("assigned_by_user_id", sa.Integer(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.ForeignKeyConstraint(["assigned_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["target_id"], ["targets.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("target_id", "user_id", name="uq_target_owner_target_user"), + ) + op.create_index(op.f("ix_target_owners_target_id"), "target_owners", ["target_id"], unique=False) + op.create_index(op.f("ix_target_owners_user_id"), "target_owners", ["user_id"], unique=False) + + op.create_table( + "alert_notification_events", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("alert_key", sa.String(length=200), nullable=False), + sa.Column("target_id", sa.Integer(), nullable=False), + sa.Column("severity", sa.String(length=16), nullable=False), + sa.Column("last_seen_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("last_sent_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.ForeignKeyConstraint(["target_id"], ["targets.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("alert_key", "target_id", "severity", name="uq_alert_notif_event_key_target_sev"), + ) + op.create_index(op.f("ix_alert_notification_events_alert_key"), "alert_notification_events", ["alert_key"], unique=False) + op.create_index(op.f("ix_alert_notification_events_target_id"), "alert_notification_events", ["target_id"], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f("ix_alert_notification_events_target_id"), table_name="alert_notification_events") + op.drop_index(op.f("ix_alert_notification_events_alert_key"), table_name="alert_notification_events") + op.drop_table("alert_notification_events") + + op.drop_index(op.f("ix_target_owners_user_id"), table_name="target_owners") + op.drop_index(op.f("ix_target_owners_target_id"), table_name="target_owners") + op.drop_table("target_owners") diff --git a/backend/app/api/routes/alerts.py b/backend/app/api/routes/alerts.py index 75f45c7..ada309f 100644 --- a/backend/app/api/routes/alerts.py +++ b/backend/app/api/routes/alerts.py @@ -20,6 +20,7 @@ from app.services.alerts import ( validate_alert_sql, validate_alert_thresholds, ) +from app.services.alert_notifications import process_target_owner_notifications from app.services.audit import write_audit_log router = APIRouter() @@ -38,7 +39,9 @@ async def list_alert_status( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ) -> AlertStatusResponse: _ = user - return await get_alert_status(db, use_cache=True) + payload = await get_alert_status(db, use_cache=True) + await process_target_owner_notifications(db, payload) + return payload @router.get("/definitions", response_model=list[AlertDefinitionOut]) diff --git a/backend/app/api/routes/targets.py b/backend/app/api/routes/targets.py index 25c3a0c..6e3f94b 100644 --- a/backend/app/api/routes/targets.py +++ b/backend/app/api/routes/targets.py @@ -1,14 +1,23 @@ from datetime import datetime + import asyncpg from fastapi import APIRouter, Depends, HTTPException, Query, status -from sqlalchemy import and_, desc, select +from sqlalchemy import and_, delete, desc, select from sqlalchemy.ext.asyncio import AsyncSession + from app.core.db import get_db from app.core.deps import get_current_user, require_roles -from app.models.models import Metric, QueryStat, Target, User +from app.models.models import Metric, QueryStat, Target, TargetOwner, User from app.schemas.metric import MetricOut, QueryStatOut from app.schemas.overview import DatabaseOverviewOut -from app.schemas.target import TargetConnectionTestRequest, TargetCreate, TargetOut, TargetUpdate +from app.schemas.target import ( + TargetConnectionTestRequest, + TargetCreate, + TargetOut, + TargetOwnerOut, + TargetOwnersUpdate, + TargetUpdate, +) from app.services.audit import write_audit_log from app.services.collector import build_target_dsn from app.services.crypto import encrypt_secret @@ -17,10 +26,46 @@ from app.services.overview_service import get_target_overview router = APIRouter() +async def _owners_by_target_ids(db: AsyncSession, target_ids: list[int]) -> dict[int, list[int]]: + if not target_ids: + return {} + rows = ( + await db.execute(select(TargetOwner.target_id, TargetOwner.user_id).where(TargetOwner.target_id.in_(target_ids))) + ).all() + mapping: dict[int, list[int]] = {target_id: [] for target_id in target_ids} + for target_id, user_id in rows: + mapping.setdefault(target_id, []).append(user_id) + return mapping + + +def _target_out_with_owners(target: Target, owner_user_ids: list[int]) -> TargetOut: + return TargetOut( + id=target.id, + name=target.name, + host=target.host, + port=target.port, + dbname=target.dbname, + username=target.username, + sslmode=target.sslmode, + use_pg_stat_statements=target.use_pg_stat_statements, + owner_user_ids=owner_user_ids, + tags=target.tags or {}, + created_at=target.created_at, + ) + + +async def _set_target_owners(db: AsyncSession, target_id: int, user_ids: list[int], assigned_by_user_id: int | None) -> None: + await db.execute(delete(TargetOwner).where(TargetOwner.target_id == target_id)) + for user_id in sorted(set(user_ids)): + db.add(TargetOwner(target_id=target_id, user_id=user_id, assigned_by_user_id=assigned_by_user_id)) + + @router.get("", response_model=list[TargetOut]) async def list_targets(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[TargetOut]: + _ = user targets = (await db.scalars(select(Target).order_by(Target.id.desc()))).all() - return [TargetOut.model_validate(item) for item in targets] + owner_map = await _owners_by_target_ids(db, [item.id for item in targets]) + return [_target_out_with_owners(item, owner_map.get(item.id, [])) for item in targets] @router.post("/test-connection") @@ -70,16 +115,37 @@ async def create_target( db.add(target) await db.commit() await db.refresh(target) + + if payload.owner_user_ids: + owners_exist = (await db.scalars(select(User.id).where(User.id.in_(payload.owner_user_ids)))).all() + if len(set(owners_exist)) != len(set(payload.owner_user_ids)): + raise HTTPException(status_code=400, detail="One or more owner users were not found") + await _set_target_owners(db, target.id, payload.owner_user_ids, user.id) + await db.commit() + await write_audit_log(db, "target.create", user.id, {"target_id": target.id, "name": target.name}) - return TargetOut.model_validate(target) + owner_map = await _owners_by_target_ids(db, [target.id]) + return _target_out_with_owners(target, owner_map.get(target.id, [])) + + +@router.get("/owner-candidates", response_model=list[TargetOwnerOut]) +async def list_owner_candidates( + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> list[TargetOwnerOut]: + _ = user + users = (await db.scalars(select(User).order_by(User.email.asc()))).all() + return [TargetOwnerOut(user_id=item.id, email=item.email, role=item.role) for item in users] @router.get("/{target_id}", response_model=TargetOut) async def get_target(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> TargetOut: + _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") - return TargetOut.model_validate(target) + owner_map = await _owners_by_target_ids(db, [target.id]) + return _target_out_with_owners(target, owner_map.get(target.id, [])) @router.put("/{target_id}", response_model=TargetOut) @@ -94,14 +160,73 @@ async def update_target( raise HTTPException(status_code=404, detail="Target not found") updates = payload.model_dump(exclude_unset=True) + owner_user_ids = updates.pop("owner_user_ids", None) if "password" in updates: target.encrypted_password = encrypt_secret(updates.pop("password")) for key, value in updates.items(): setattr(target, key, value) + + if owner_user_ids is not None: + owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all() + if len(set(owners_exist)) != len(set(owner_user_ids)): + raise HTTPException(status_code=400, detail="One or more owner users were not found") + await _set_target_owners(db, target.id, owner_user_ids, user.id) + await db.commit() await db.refresh(target) await write_audit_log(db, "target.update", user.id, {"target_id": target.id}) - return TargetOut.model_validate(target) + owner_map = await _owners_by_target_ids(db, [target.id]) + return _target_out_with_owners(target, owner_map.get(target.id, [])) + + +@router.put("/{target_id}/owners", response_model=list[TargetOwnerOut]) +async def set_target_owners( + target_id: int, + payload: TargetOwnersUpdate, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> list[TargetOwnerOut]: + target = await db.scalar(select(Target).where(Target.id == target_id)) + if not target: + raise HTTPException(status_code=404, detail="Target not found") + owner_user_ids = sorted(set(payload.user_ids)) + if owner_user_ids: + owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all() + if len(set(owners_exist)) != len(set(owner_user_ids)): + raise HTTPException(status_code=400, detail="One or more owner users were not found") + await _set_target_owners(db, target_id, owner_user_ids, user.id) + await db.commit() + await write_audit_log(db, "target.owners.update", user.id, {"target_id": target_id, "owner_user_ids": owner_user_ids}) + rows = ( + await db.execute( + select(User.id, User.email, User.role) + .join(TargetOwner, TargetOwner.user_id == User.id) + .where(TargetOwner.target_id == target_id) + .order_by(User.email.asc()) + ) + ).all() + return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows] + + +@router.get("/{target_id}/owners", response_model=list[TargetOwnerOut]) +async def get_target_owners( + target_id: int, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> list[TargetOwnerOut]: + _ = user + target = await db.scalar(select(Target).where(Target.id == target_id)) + if not target: + raise HTTPException(status_code=404, detail="Target not found") + rows = ( + await db.execute( + select(User.id, User.email, User.role) + .join(TargetOwner, TargetOwner.user_id == User.id) + .where(TargetOwner.target_id == target_id) + .order_by(User.email.asc()) + ) + ).all() + return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows] @router.delete("/{target_id}") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 8b1cf77..d9ac4cd 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,3 +1,23 @@ -from app.models.models import AlertDefinition, AuditLog, EmailNotificationSettings, Metric, QueryStat, Target, User +from app.models.models import ( + AlertDefinition, + AlertNotificationEvent, + AuditLog, + EmailNotificationSettings, + Metric, + QueryStat, + Target, + TargetOwner, + User, +) -__all__ = ["User", "Target", "Metric", "QueryStat", "AuditLog", "AlertDefinition", "EmailNotificationSettings"] +__all__ = [ + "User", + "Target", + "Metric", + "QueryStat", + "AuditLog", + "AlertDefinition", + "EmailNotificationSettings", + "TargetOwner", + "AlertNotificationEvent", +] diff --git a/backend/app/models/models.py b/backend/app/models/models.py index 885af99..962925e 100644 --- a/backend/app/models/models.py +++ b/backend/app/models/models.py @@ -1,5 +1,5 @@ from datetime import datetime -from sqlalchemy import JSON, Boolean, DateTime, Float, ForeignKey, Integer, String, Text, func +from sqlalchemy import JSON, Boolean, DateTime, Float, ForeignKey, Integer, String, Text, UniqueConstraint, func from sqlalchemy.orm import Mapped, mapped_column, relationship from app.core.db import Base @@ -14,6 +14,11 @@ class User(Base): created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False) audit_logs: Mapped[list["AuditLog"]] = relationship(back_populates="user") + owned_targets: Mapped[list["TargetOwner"]] = relationship( + back_populates="user", + cascade="all, delete-orphan", + foreign_keys="TargetOwner.user_id", + ) class Target(Base): @@ -34,6 +39,21 @@ class Target(Base): metrics: Mapped[list["Metric"]] = relationship(back_populates="target", cascade="all, delete-orphan") query_stats: Mapped[list["QueryStat"]] = relationship(back_populates="target", cascade="all, delete-orphan") alert_definitions: Mapped[list["AlertDefinition"]] = relationship(back_populates="target", cascade="all, delete-orphan") + owners: Mapped[list["TargetOwner"]] = relationship(back_populates="target", cascade="all, delete-orphan") + + +class TargetOwner(Base): + __tablename__ = "target_owners" + __table_args__ = (UniqueConstraint("target_id", "user_id", name="uq_target_owner_target_user"),) + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + target_id: Mapped[int] = mapped_column(ForeignKey("targets.id", ondelete="CASCADE"), nullable=False, index=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True) + assigned_by_user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now()) + + target: Mapped[Target] = relationship(back_populates="owners") + user: Mapped[User] = relationship(foreign_keys=[user_id], back_populates="owned_targets") class Metric(Base): @@ -121,3 +141,15 @@ class EmailNotificationSettings(Base): server_default=func.now(), onupdate=func.now(), ) + + +class AlertNotificationEvent(Base): + __tablename__ = "alert_notification_events" + __table_args__ = (UniqueConstraint("alert_key", "target_id", "severity", name="uq_alert_notif_event_key_target_sev"),) + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + alert_key: Mapped[str] = mapped_column(String(200), nullable=False, index=True) + target_id: Mapped[int] = mapped_column(ForeignKey("targets.id", ondelete="CASCADE"), nullable=False, index=True) + severity: Mapped[str] = mapped_column(String(16), nullable=False) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now()) + last_sent_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now()) diff --git a/backend/app/schemas/target.py b/backend/app/schemas/target.py index e29369f..f26acfd 100644 --- a/backend/app/schemas/target.py +++ b/backend/app/schemas/target.py @@ -10,6 +10,7 @@ class TargetBase(BaseModel): username: str sslmode: str = "prefer" use_pg_stat_statements: bool = True + owner_user_ids: list[int] = Field(default_factory=list) tags: dict = Field(default_factory=dict) @@ -35,6 +36,7 @@ class TargetUpdate(BaseModel): password: str | None = None sslmode: str | None = None use_pg_stat_statements: bool | None = None + owner_user_ids: list[int] | None = None tags: dict | None = None @@ -43,3 +45,13 @@ class TargetOut(TargetBase): created_at: datetime model_config = {"from_attributes": True} + + +class TargetOwnerOut(BaseModel): + user_id: int + email: str + role: str + + +class TargetOwnersUpdate(BaseModel): + user_ids: list[int] = Field(default_factory=list) diff --git a/backend/app/services/alert_notifications.py b/backend/app/services/alert_notifications.py new file mode 100644 index 0000000..e0a0071 --- /dev/null +++ b/backend/app/services/alert_notifications.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from email.message import EmailMessage +import smtplib +import ssl + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.models import AlertNotificationEvent, EmailNotificationSettings, TargetOwner, User +from app.schemas.alert import AlertStatusResponse +from app.services.crypto import decrypt_secret + +_NOTIFICATION_COOLDOWN = timedelta(minutes=30) + + +async def _smtp_send( + host: str, + port: int, + username: str | None, + password: str | None, + from_email: str, + recipient: str, + subject: str, + body: str, + use_starttls: bool, + use_ssl: bool, +) -> None: + def _send() -> None: + message = EmailMessage() + message["From"] = from_email + message["To"] = recipient + message["Subject"] = subject + message.set_content(body) + + if use_ssl: + with smtplib.SMTP_SSL(host, port, timeout=10, context=ssl.create_default_context()) as smtp: + if username: + smtp.login(username, password or "") + smtp.send_message(message) + return + with smtplib.SMTP(host, port, timeout=10) as smtp: + if use_starttls: + smtp.starttls(context=ssl.create_default_context()) + if username: + smtp.login(username, password or "") + smtp.send_message(message) + + await asyncio.to_thread(_send) + + +def _render_subject(item) -> str: + sev = item.severity.upper() + return f"[NexaPG][{sev}] {item.target_name} - {item.name}" + + +def _render_body(item) -> str: + lines = [ + f"Severity: {item.severity}", + f"Target: {item.target_name} (id={item.target_id})", + f"Alert: {item.name}", + f"Category: {item.category}", + f"Checked At: {item.checked_at.isoformat()}", + "", + f"Description: {item.description}", + f"Message: {item.message}", + ] + if item.value is not None: + lines.append(f"Current Value: {item.value}") + if item.warning_threshold is not None: + lines.append(f"Warning Threshold: {item.warning_threshold}") + if item.alert_threshold is not None: + lines.append(f"Alert Threshold: {item.alert_threshold}") + lines.append("") + lines.append(f"Alert Key: {item.alert_key}") + lines.append("Sent by NexaPG notification service.") + return "\n".join(lines) + + +async def process_target_owner_notifications(db: AsyncSession, status: AlertStatusResponse) -> None: + settings = await db.scalar(select(EmailNotificationSettings).limit(1)) + if not settings or not settings.enabled: + return + if not settings.smtp_host or not settings.from_email: + return + + password = decrypt_secret(settings.encrypted_smtp_password) if settings.encrypted_smtp_password else None + now = datetime.now(timezone.utc) + active_items = status.alerts + status.warnings + if not active_items: + return + + target_ids = sorted({item.target_id for item in active_items}) + owner_rows = ( + await db.execute( + select(TargetOwner.target_id, User.email) + .join(User, User.id == TargetOwner.user_id) + .where(TargetOwner.target_id.in_(target_ids)) + ) + ).all() + owners_map: dict[int, set[str]] = {} + for target_id, email in owner_rows: + owners_map.setdefault(target_id, set()).add(email) + + existing_rows = ( + await db.scalars( + select(AlertNotificationEvent).where( + AlertNotificationEvent.target_id.in_(target_ids) + ) + ) + ).all() + event_map = {(row.alert_key, row.target_id, row.severity): row for row in existing_rows} + + for item in active_items: + recipients = sorted(owners_map.get(item.target_id, set())) + if not recipients: + continue + + key = (item.alert_key, item.target_id, item.severity) + existing = event_map.get(key) + should_send = existing is None or (now - existing.last_sent_at) >= _NOTIFICATION_COOLDOWN + + if should_send: + subject = _render_subject(item) + body = _render_body(item) + for recipient in recipients: + try: + await _smtp_send( + host=settings.smtp_host, + port=settings.smtp_port, + username=settings.smtp_username, + password=password, + from_email=settings.from_email, + recipient=recipient, + subject=subject, + body=body, + use_starttls=settings.use_starttls, + use_ssl=settings.use_ssl, + ) + except Exception: + continue + + if existing: + existing.last_seen_at = now + if should_send: + existing.last_sent_at = now + else: + db.add( + AlertNotificationEvent( + alert_key=item.alert_key, + target_id=item.target_id, + severity=item.severity, + last_seen_at=now, + last_sent_at=now if should_send else now, + ) + ) + + await db.commit() diff --git a/frontend/src/pages/TargetDetailPage.jsx b/frontend/src/pages/TargetDetailPage.jsx index 76031cb..a222ce2 100644 --- a/frontend/src/pages/TargetDetailPage.jsx +++ b/frontend/src/pages/TargetDetailPage.jsx @@ -83,6 +83,7 @@ export function TargetDetailPage() { const [activity, setActivity] = useState([]); const [overview, setOverview] = useState(null); const [targetMeta, setTargetMeta] = useState(null); + const [owners, setOwners] = useState([]); const [error, setError] = useState(""); const [loading, setLoading] = useState(true); const refreshRef = useRef(refresh); @@ -98,7 +99,7 @@ export function TargetDetailPage() { setLoading(true); } try { - const [connections, xacts, cache, locksTable, activityTable, overviewData, targetInfo] = await Promise.all([ + const [connections, xacts, cache, locksTable, activityTable, overviewData, targetInfo, ownerRows] = await Promise.all([ loadMetric(id, "connections_total", range, tokens, refreshRef.current), loadMetric(id, "xacts_total", range, tokens, refreshRef.current), loadMetric(id, "cache_hit_ratio", range, tokens, refreshRef.current), @@ -106,6 +107,7 @@ export function TargetDetailPage() { apiFetch(`/targets/${id}/activity`, {}, tokens, refreshRef.current), apiFetch(`/targets/${id}/overview`, {}, tokens, refreshRef.current), apiFetch(`/targets/${id}`, {}, tokens, refreshRef.current), + apiFetch(`/targets/${id}/owners`, {}, tokens, refreshRef.current), ]); if (!active) return; setSeries({ connections, xacts, cache }); @@ -113,6 +115,7 @@ export function TargetDetailPage() { setActivity(activityTable); setOverview(overviewData); setTargetMeta(targetInfo); + setOwners(ownerRows); setError(""); } catch (e) { if (active) setError(String(e.message || e)); @@ -229,6 +232,10 @@ export function TargetDetailPage() { Target Detail {targetMeta?.name || `#${id}`} {targetMeta?.dbname ? ` (${targetMeta.dbname})` : ""} +