Files
NexaPG/backend/app/api/routes/alerts.py
nessi 4035335901 Add alert management functionality in backend and frontend
This commit introduces alert management capabilities, including creating, updating, listing, and removing custom SQL-based alerts in the backend. It adds the necessary database migrations, API endpoints, and frontend pages to manage alerts, enabling users to define thresholds and monitor system health effectively.
2026-02-12 12:50:11 +01:00

144 lines
5.6 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
from app.core.deps import get_current_user, require_roles
from app.models.models import AlertDefinition, Target, User
from app.schemas.alert import (
AlertDefinitionCreate,
AlertDefinitionOut,
AlertDefinitionTestRequest,
AlertDefinitionTestResponse,
AlertDefinitionUpdate,
AlertStatusResponse,
)
from app.services.alerts import (
get_alert_status,
invalidate_alert_cache,
run_scalar_sql_for_target,
validate_alert_sql,
validate_alert_thresholds,
)
from app.services.audit import write_audit_log
router = APIRouter()
async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> None:
if target_id is None:
return
target_exists = await db.scalar(select(Target.id).where(Target.id == target_id))
if target_exists is None:
raise HTTPException(status_code=404, detail="Target not found")
@router.get("/status", response_model=AlertStatusResponse)
async def list_alert_status(
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
) -> AlertStatusResponse:
_ = user
return await get_alert_status(db, use_cache=True)
@router.get("/definitions", response_model=list[AlertDefinitionOut])
async def list_alert_definitions(
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
) -> list[AlertDefinitionOut]:
_ = user
defs = (await db.scalars(select(AlertDefinition).order_by(AlertDefinition.id.desc()))).all()
return [AlertDefinitionOut.model_validate(item) for item in defs]
@router.post("/definitions", response_model=AlertDefinitionOut, status_code=status.HTTP_201_CREATED)
async def create_alert_definition(
payload: AlertDefinitionCreate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionOut:
await _validate_target_exists(db, payload.target_id)
sql_text = validate_alert_sql(payload.sql_text)
validate_alert_thresholds(payload.comparison, payload.warning_threshold, payload.alert_threshold)
definition = AlertDefinition(
name=payload.name,
description=payload.description,
target_id=payload.target_id,
sql_text=sql_text,
comparison=payload.comparison,
warning_threshold=payload.warning_threshold,
alert_threshold=payload.alert_threshold,
enabled=payload.enabled,
created_by_user_id=user.id,
)
db.add(definition)
await db.commit()
await db.refresh(definition)
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.create", user.id, {"alert_definition_id": definition.id, "name": definition.name})
return AlertDefinitionOut.model_validate(definition)
@router.put("/definitions/{definition_id}", response_model=AlertDefinitionOut)
async def update_alert_definition(
definition_id: int,
payload: AlertDefinitionUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionOut:
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
if definition is None:
raise HTTPException(status_code=404, detail="Alert definition not found")
updates = payload.model_dump(exclude_unset=True)
if "target_id" in updates:
await _validate_target_exists(db, updates["target_id"])
if "sql_text" in updates and updates["sql_text"] is not None:
updates["sql_text"] = validate_alert_sql(updates["sql_text"])
comparison = updates.get("comparison", definition.comparison)
warning_threshold = updates.get("warning_threshold", definition.warning_threshold)
alert_threshold = updates.get("alert_threshold", definition.alert_threshold)
validate_alert_thresholds(comparison, warning_threshold, alert_threshold)
for key, value in updates.items():
setattr(definition, key, value)
await db.commit()
await db.refresh(definition)
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.update", user.id, {"alert_definition_id": definition.id})
return AlertDefinitionOut.model_validate(definition)
@router.delete("/definitions/{definition_id}")
async def delete_alert_definition(
definition_id: int,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> dict:
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
if definition is None:
raise HTTPException(status_code=404, detail="Alert definition not found")
await db.delete(definition)
await db.commit()
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.delete", user.id, {"alert_definition_id": definition_id})
return {"status": "deleted"}
@router.post("/definitions/test", response_model=AlertDefinitionTestResponse)
async def test_alert_definition(
payload: AlertDefinitionTestRequest,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionTestResponse:
_ = user
target = await db.scalar(select(Target).where(Target.id == payload.target_id))
if target is None:
raise HTTPException(status_code=404, detail="Target not found")
try:
value = await run_scalar_sql_for_target(target, payload.sql_text)
return AlertDefinitionTestResponse(ok=True, value=value)
except Exception as exc:
return AlertDefinitionTestResponse(ok=False, error=str(exc))