All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
Replaced hardcoded standard alert metadata with API-driven data. This change ensures the standard alert information is dynamically loaded from the backend, improving maintainability and scalability. Also adjusted the frontend to handle cases where no data is available.
157 lines
6.1 KiB
Python
157 lines
6.1 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.db import get_db
|
|
from app.core.deps import get_current_user, require_roles
|
|
from app.models.models import AlertDefinition, Target, User
|
|
from app.schemas.alert import (
|
|
AlertDefinitionCreate,
|
|
AlertDefinitionOut,
|
|
AlertDefinitionTestRequest,
|
|
AlertDefinitionTestResponse,
|
|
AlertDefinitionUpdate,
|
|
AlertStatusResponse,
|
|
StandardAlertReferenceItem,
|
|
)
|
|
from app.services.alerts import (
|
|
get_standard_alert_reference,
|
|
get_alert_status,
|
|
invalidate_alert_cache,
|
|
run_scalar_sql_for_target,
|
|
validate_alert_sql,
|
|
validate_alert_thresholds,
|
|
)
|
|
from app.services.alert_notifications import process_target_owner_notifications
|
|
from app.services.audit import write_audit_log
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> None:
|
|
if target_id is None:
|
|
return
|
|
target_exists = await db.scalar(select(Target.id).where(Target.id == target_id))
|
|
if target_exists is None:
|
|
raise HTTPException(status_code=404, detail="Target not found")
|
|
|
|
|
|
@router.get("/status", response_model=AlertStatusResponse)
|
|
async def list_alert_status(
|
|
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
|
|
) -> AlertStatusResponse:
|
|
_ = user
|
|
payload = await get_alert_status(db, use_cache=True)
|
|
await process_target_owner_notifications(db, payload)
|
|
return payload
|
|
|
|
|
|
@router.get("/standard-reference", response_model=list[StandardAlertReferenceItem])
|
|
async def list_standard_alert_reference(
|
|
user: User = Depends(get_current_user),
|
|
) -> list[StandardAlertReferenceItem]:
|
|
_ = user
|
|
return [StandardAlertReferenceItem(**item) for item in get_standard_alert_reference()]
|
|
|
|
|
|
@router.get("/definitions", response_model=list[AlertDefinitionOut])
|
|
async def list_alert_definitions(
|
|
user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
|
|
) -> list[AlertDefinitionOut]:
|
|
_ = user
|
|
defs = (await db.scalars(select(AlertDefinition).order_by(AlertDefinition.id.desc()))).all()
|
|
return [AlertDefinitionOut.model_validate(item) for item in defs]
|
|
|
|
|
|
@router.post("/definitions", response_model=AlertDefinitionOut, status_code=status.HTTP_201_CREATED)
|
|
async def create_alert_definition(
|
|
payload: AlertDefinitionCreate,
|
|
user: User = Depends(require_roles("admin", "operator")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> AlertDefinitionOut:
|
|
await _validate_target_exists(db, payload.target_id)
|
|
sql_text = validate_alert_sql(payload.sql_text)
|
|
validate_alert_thresholds(payload.comparison, payload.warning_threshold, payload.alert_threshold)
|
|
|
|
definition = AlertDefinition(
|
|
name=payload.name,
|
|
description=payload.description,
|
|
target_id=payload.target_id,
|
|
sql_text=sql_text,
|
|
comparison=payload.comparison,
|
|
warning_threshold=payload.warning_threshold,
|
|
alert_threshold=payload.alert_threshold,
|
|
enabled=payload.enabled,
|
|
created_by_user_id=user.id,
|
|
)
|
|
db.add(definition)
|
|
await db.commit()
|
|
await db.refresh(definition)
|
|
invalidate_alert_cache()
|
|
await write_audit_log(db, "alert.definition.create", user.id, {"alert_definition_id": definition.id, "name": definition.name})
|
|
return AlertDefinitionOut.model_validate(definition)
|
|
|
|
|
|
@router.put("/definitions/{definition_id}", response_model=AlertDefinitionOut)
|
|
async def update_alert_definition(
|
|
definition_id: int,
|
|
payload: AlertDefinitionUpdate,
|
|
user: User = Depends(require_roles("admin", "operator")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> AlertDefinitionOut:
|
|
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
|
|
if definition is None:
|
|
raise HTTPException(status_code=404, detail="Alert definition not found")
|
|
|
|
updates = payload.model_dump(exclude_unset=True)
|
|
if "target_id" in updates:
|
|
await _validate_target_exists(db, updates["target_id"])
|
|
if "sql_text" in updates and updates["sql_text"] is not None:
|
|
updates["sql_text"] = validate_alert_sql(updates["sql_text"])
|
|
|
|
comparison = updates.get("comparison", definition.comparison)
|
|
warning_threshold = updates.get("warning_threshold", definition.warning_threshold)
|
|
alert_threshold = updates.get("alert_threshold", definition.alert_threshold)
|
|
validate_alert_thresholds(comparison, warning_threshold, alert_threshold)
|
|
|
|
for key, value in updates.items():
|
|
setattr(definition, key, value)
|
|
await db.commit()
|
|
await db.refresh(definition)
|
|
invalidate_alert_cache()
|
|
await write_audit_log(db, "alert.definition.update", user.id, {"alert_definition_id": definition.id})
|
|
return AlertDefinitionOut.model_validate(definition)
|
|
|
|
|
|
@router.delete("/definitions/{definition_id}")
|
|
async def delete_alert_definition(
|
|
definition_id: int,
|
|
user: User = Depends(require_roles("admin", "operator")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> dict:
|
|
definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id))
|
|
if definition is None:
|
|
raise HTTPException(status_code=404, detail="Alert definition not found")
|
|
await db.delete(definition)
|
|
await db.commit()
|
|
invalidate_alert_cache()
|
|
await write_audit_log(db, "alert.definition.delete", user.id, {"alert_definition_id": definition_id})
|
|
return {"status": "deleted"}
|
|
|
|
|
|
@router.post("/definitions/test", response_model=AlertDefinitionTestResponse)
|
|
async def test_alert_definition(
|
|
payload: AlertDefinitionTestRequest,
|
|
user: User = Depends(require_roles("admin", "operator")),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> AlertDefinitionTestResponse:
|
|
_ = user
|
|
target = await db.scalar(select(Target).where(Target.id == payload.target_id))
|
|
if target is None:
|
|
raise HTTPException(status_code=404, detail="Target not found")
|
|
try:
|
|
value = await run_scalar_sql_for_target(target, payload.sql_text)
|
|
return AlertDefinitionTestResponse(ok=True, value=value)
|
|
except Exception as exc:
|
|
return AlertDefinitionTestResponse(ok=False, error=str(exc))
|