Files
NexaPG/backend/app/api/routes/alerts.py
nessi 117710cc0a [NX-101 Issue] Refactor error handling to use consistent API error format
Replaced all inline error messages with the standardized `api_error` helper for consistent error response formatting. This improves clarity, maintainability, and ensures uniform error structures across the application. Updated logging for collector failures to include error class and switched to warning level for target unreachable scenarios.
2026-02-14 11:30:56 +01:00

158 lines
6.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
from app.core.deps import get_current_user, require_roles
from app.core.errors import api_error
from app.models.models import AlertDefinition, Target, User
from app.schemas.alert import (
AlertDefinitionCreate,
AlertDefinitionOut,
AlertDefinitionTestRequest,
AlertDefinitionTestResponse,
AlertDefinitionUpdate,
AlertStatusResponse,
StandardAlertReferenceItem,
)
from app.services.alerts import (
get_standard_alert_reference,
get_alert_status,
invalidate_alert_cache,
run_scalar_sql_for_target,
validate_alert_sql,
validate_alert_thresholds,
)
from app.services.alert_notifications import process_target_owner_notifications
from app.services.audit import write_audit_log
router = APIRouter()
async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> None:
if target_id is None:
return
target_exists = await db.scalar(select(Target.id).where(Target.id == target_id))
if target_exists is None:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
@router.get("/status", response_model=AlertStatusResponse)
async def list_alert_status(
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
) -> AlertStatusResponse:
_ = user
payload = await get_alert_status(db, use_cache=True)
await process_target_owner_notifications(db, payload)
return payload
@router.get("/standard-reference", response_model=list[StandardAlertReferenceItem])
async def list_standard_alert_reference(
user: User = Depends(get_current_user),
) -> list[StandardAlertReferenceItem]:
_ = user
return [StandardAlertReferenceItem(**item) for item in get_standard_alert_reference()]
@router.get("/definitions", response_model=list[AlertDefinitionOut])
async def list_alert_definitions(
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
) -> list[AlertDefinitionOut]:
_ = user
defs = (await db.scalars(select(AlertDefinition).order_by(AlertDefinition.id.desc()))).all()
return [AlertDefinitionOut.model_validate(item) for item in defs]
@router.post("/definitions", response_model=AlertDefinitionOut, status_code=status.HTTP_201_CREATED)
async def create_alert_definition(
payload: AlertDefinitionCreate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionOut:
await _validate_target_exists(db, payload.target_id)
sql_text = validate_alert_sql(payload.sql_text)
validate_alert_thresholds(payload.comparison, payload.warning_threshold, payload.alert_threshold)
definition = AlertDefinition(
name=payload.name,
description=payload.description,
target_id=payload.target_id,
sql_text=sql_text,
comparison=payload.comparison,
warning_threshold=payload.warning_threshold,
alert_threshold=payload.alert_threshold,
enabled=payload.enabled,
created_by_user_id=user.id,
)
db.add(definition)
await db.commit()
await db.refresh(definition)
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.create", user.id, {"alert_definition_id": definition.id, "name": definition.name})
return AlertDefinitionOut.model_validate(definition)
@router.put("/definitions/{definition_id}", response_model=AlertDefinitionOut)
async def update_alert_definition(
definition_id: int,
payload: AlertDefinitionUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionOut:
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
if definition is None:
raise HTTPException(status_code=404, detail=api_error("alert_definition_not_found", "Alert definition not found"))
updates = payload.model_dump(exclude_unset=True)
if "target_id" in updates:
await _validate_target_exists(db, updates["target_id"])
if "sql_text" in updates and updates["sql_text"] is not None:
updates["sql_text"] = validate_alert_sql(updates["sql_text"])
comparison = updates.get("comparison", definition.comparison)
warning_threshold = updates.get("warning_threshold", definition.warning_threshold)
alert_threshold = updates.get("alert_threshold", definition.alert_threshold)
validate_alert_thresholds(comparison, warning_threshold, alert_threshold)
for key, value in updates.items():
setattr(definition, key, value)
await db.commit()
await db.refresh(definition)
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.update", user.id, {"alert_definition_id": definition.id})
return AlertDefinitionOut.model_validate(definition)
@router.delete("/definitions/{definition_id}")
async def delete_alert_definition(
definition_id: int,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> dict:
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
if definition is None:
raise HTTPException(status_code=404, detail=api_error("alert_definition_not_found", "Alert definition not found"))
await db.delete(definition)
await db.commit()
invalidate_alert_cache()
await write_audit_log(db, "alert.definition.delete", user.id, {"alert_definition_id": definition_id})
return {"status": "deleted"}
@router.post("/definitions/test", response_model=AlertDefinitionTestResponse)
async def test_alert_definition(
payload: AlertDefinitionTestRequest,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> AlertDefinitionTestResponse:
_ = user
target = await db.scalar(select(Target).where(Target.id == payload.target_id))
if target is None:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
try:
value = await run_scalar_sql_for_target(target, payload.sql_text)
return AlertDefinitionTestResponse(ok=True, value=value)
except Exception as exc:
return AlertDefinitionTestResponse(ok=False, error=str(exc))