From 4035335901d1115550940f774051b5b94e62921c Mon Sep 17 00:00:00 2001 From: nessi Date: Thu, 12 Feb 2026 12:50:11 +0100 Subject: [PATCH] Add alert management functionality in backend and frontend This commit introduces alert management capabilities, including creating, updating, listing, and removing custom SQL-based alerts in the backend. It adds the necessary database migrations, API endpoints, and frontend pages to manage alerts, enabling users to define thresholds and monitor system health effectively. --- .../versions/0002_alert_definitions.py | 43 ++ backend/app/api/router.py | 3 +- backend/app/api/routes/alerts.py | 143 ++++++ backend/app/models/__init__.py | 4 +- backend/app/models/models.py | 27 +- backend/app/schemas/alert.py | 74 +++ backend/app/services/alerts.py | 457 ++++++++++++++++++ backend/app/services/collector.py | 26 +- frontend/src/App.jsx | 10 + frontend/src/pages/AlertsPage.jsx | 333 +++++++++++++ frontend/src/styles.css | 121 +++++ 11 files changed, 1236 insertions(+), 5 deletions(-) create mode 100644 backend/alembic/versions/0002_alert_definitions.py create mode 100644 backend/app/api/routes/alerts.py create mode 100644 backend/app/schemas/alert.py create mode 100644 backend/app/services/alerts.py create mode 100644 frontend/src/pages/AlertsPage.jsx diff --git a/backend/alembic/versions/0002_alert_definitions.py b/backend/alembic/versions/0002_alert_definitions.py new file mode 100644 index 0000000..fd8a07c --- /dev/null +++ b/backend/alembic/versions/0002_alert_definitions.py @@ -0,0 +1,43 @@ +"""add alert definitions + +Revision ID: 0002_alert_definitions +Revises: 0001_init +Create Date: 2026-02-12 +""" + +from alembic import op +import sqlalchemy as sa + + +revision = "0002_alert_definitions" +down_revision = "0001_init" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "alert_definitions", + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column("name", sa.String(length=160), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("target_id", sa.Integer(), sa.ForeignKey("targets.id", ondelete="CASCADE"), nullable=True), + sa.Column("sql_text", sa.Text(), nullable=False), + sa.Column("comparison", sa.String(length=10), nullable=False, server_default="gte"), + sa.Column("warning_threshold", sa.Float(), nullable=True), + sa.Column("alert_threshold", sa.Float(), nullable=False), + sa.Column("enabled", sa.Boolean(), nullable=False, server_default=sa.text("true")), + sa.Column("created_by_user_id", sa.Integer(), sa.ForeignKey("users.id"), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("ix_alert_definitions_target_id", "alert_definitions", ["target_id"]) + op.create_index("ix_alert_definitions_created_by_user_id", "alert_definitions", ["created_by_user_id"]) + op.create_index("ix_alert_definitions_created_at", "alert_definitions", ["created_at"]) + + +def downgrade() -> None: + op.drop_index("ix_alert_definitions_created_at", table_name="alert_definitions") + op.drop_index("ix_alert_definitions_created_by_user_id", table_name="alert_definitions") + op.drop_index("ix_alert_definitions_target_id", table_name="alert_definitions") + op.drop_table("alert_definitions") diff --git a/backend/app/api/router.py b/backend/app/api/router.py index e30229e..25d2953 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -1,9 +1,10 @@ from fastapi import APIRouter -from app.api.routes import admin_users, auth, health, me, targets +from app.api.routes import admin_users, alerts, auth, health, me, targets api_router = APIRouter() api_router.include_router(health.router, tags=["health"]) api_router.include_router(auth.router, prefix="/auth", tags=["auth"]) api_router.include_router(me.router, tags=["auth"]) api_router.include_router(targets.router, prefix="/targets", tags=["targets"]) +api_router.include_router(alerts.router, prefix="/alerts", tags=["alerts"]) api_router.include_router(admin_users.router, prefix="/admin/users", tags=["admin"]) diff --git a/backend/app/api/routes/alerts.py b/backend/app/api/routes/alerts.py new file mode 100644 index 0000000..75f45c7 --- /dev/null +++ b/backend/app/api/routes/alerts.py @@ -0,0 +1,143 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.db import get_db +from app.core.deps import get_current_user, require_roles +from app.models.models import AlertDefinition, Target, User +from app.schemas.alert import ( + AlertDefinitionCreate, + AlertDefinitionOut, + AlertDefinitionTestRequest, + AlertDefinitionTestResponse, + AlertDefinitionUpdate, + AlertStatusResponse, +) +from app.services.alerts import ( + get_alert_status, + invalidate_alert_cache, + run_scalar_sql_for_target, + validate_alert_sql, + validate_alert_thresholds, +) +from app.services.audit import write_audit_log + +router = APIRouter() + + +async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> None: + if target_id is None: + return + target_exists = await db.scalar(select(Target.id).where(Target.id == target_id)) + if target_exists is None: + raise HTTPException(status_code=404, detail="Target not found") + + +@router.get("/status", response_model=AlertStatusResponse) +async def list_alert_status( + user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) +) -> AlertStatusResponse: + _ = user + return await get_alert_status(db, use_cache=True) + + +@router.get("/definitions", response_model=list[AlertDefinitionOut]) +async def list_alert_definitions( + user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) +) -> list[AlertDefinitionOut]: + _ = user + defs = (await db.scalars(select(AlertDefinition).order_by(AlertDefinition.id.desc()))).all() + return [AlertDefinitionOut.model_validate(item) for item in defs] + + +@router.post("/definitions", response_model=AlertDefinitionOut, status_code=status.HTTP_201_CREATED) +async def create_alert_definition( + payload: AlertDefinitionCreate, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionOut: + await _validate_target_exists(db, payload.target_id) + sql_text = validate_alert_sql(payload.sql_text) + validate_alert_thresholds(payload.comparison, payload.warning_threshold, payload.alert_threshold) + + definition = AlertDefinition( + name=payload.name, + description=payload.description, + target_id=payload.target_id, + sql_text=sql_text, + comparison=payload.comparison, + warning_threshold=payload.warning_threshold, + alert_threshold=payload.alert_threshold, + enabled=payload.enabled, + created_by_user_id=user.id, + ) + db.add(definition) + await db.commit() + await db.refresh(definition) + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.create", user.id, {"alert_definition_id": definition.id, "name": definition.name}) + return AlertDefinitionOut.model_validate(definition) + + +@router.put("/definitions/{definition_id}", response_model=AlertDefinitionOut) +async def update_alert_definition( + definition_id: int, + payload: AlertDefinitionUpdate, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionOut: + definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) + if definition is None: + raise HTTPException(status_code=404, detail="Alert definition not found") + + updates = payload.model_dump(exclude_unset=True) + if "target_id" in updates: + await _validate_target_exists(db, updates["target_id"]) + if "sql_text" in updates and updates["sql_text"] is not None: + updates["sql_text"] = validate_alert_sql(updates["sql_text"]) + + comparison = updates.get("comparison", definition.comparison) + warning_threshold = updates.get("warning_threshold", definition.warning_threshold) + alert_threshold = updates.get("alert_threshold", definition.alert_threshold) + validate_alert_thresholds(comparison, warning_threshold, alert_threshold) + + for key, value in updates.items(): + setattr(definition, key, value) + await db.commit() + await db.refresh(definition) + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.update", user.id, {"alert_definition_id": definition.id}) + return AlertDefinitionOut.model_validate(definition) + + +@router.delete("/definitions/{definition_id}") +async def delete_alert_definition( + definition_id: int, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> dict: + definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) + if definition is None: + raise HTTPException(status_code=404, detail="Alert definition not found") + await db.delete(definition) + await db.commit() + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.delete", user.id, {"alert_definition_id": definition_id}) + return {"status": "deleted"} + + +@router.post("/definitions/test", response_model=AlertDefinitionTestResponse) +async def test_alert_definition( + payload: AlertDefinitionTestRequest, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionTestResponse: + _ = user + target = await db.scalar(select(Target).where(Target.id == payload.target_id)) + if target is None: + raise HTTPException(status_code=404, detail="Target not found") + try: + value = await run_scalar_sql_for_target(target, payload.sql_text) + return AlertDefinitionTestResponse(ok=True, value=value) + except Exception as exc: + return AlertDefinitionTestResponse(ok=False, error=str(exc)) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 6ea25b6..9f1dd1a 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,3 +1,3 @@ -from app.models.models import AuditLog, Metric, QueryStat, Target, User +from app.models.models import AlertDefinition, AuditLog, Metric, QueryStat, Target, User -__all__ = ["User", "Target", "Metric", "QueryStat", "AuditLog"] +__all__ = ["User", "Target", "Metric", "QueryStat", "AuditLog", "AlertDefinition"] diff --git a/backend/app/models/models.py b/backend/app/models/models.py index ce90997..3542939 100644 --- a/backend/app/models/models.py +++ b/backend/app/models/models.py @@ -1,5 +1,5 @@ from datetime import datetime -from sqlalchemy import JSON, DateTime, Float, ForeignKey, Integer, String, Text, func +from sqlalchemy import JSON, Boolean, DateTime, Float, ForeignKey, Integer, String, Text, func from sqlalchemy.orm import Mapped, mapped_column, relationship from app.core.db import Base @@ -32,6 +32,7 @@ class Target(Base): metrics: Mapped[list["Metric"]] = relationship(back_populates="target", cascade="all, delete-orphan") query_stats: Mapped[list["QueryStat"]] = relationship(back_populates="target", cascade="all, delete-orphan") + alert_definitions: Mapped[list["AlertDefinition"]] = relationship(back_populates="target", cascade="all, delete-orphan") class Metric(Base): @@ -73,3 +74,27 @@ class AuditLog(Base): payload: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict) user: Mapped[User | None] = relationship(back_populates="audit_logs") + + +class AlertDefinition(Base): + __tablename__ = "alert_definitions" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + name: Mapped[str] = mapped_column(String(160), nullable=False) + description: Mapped[str | None] = mapped_column(Text, nullable=True) + target_id: Mapped[int | None] = mapped_column(ForeignKey("targets.id", ondelete="CASCADE"), nullable=True, index=True) + sql_text: Mapped[str] = mapped_column(Text, nullable=False) + comparison: Mapped[str] = mapped_column(String(10), nullable=False, default="gte") + warning_threshold: Mapped[float | None] = mapped_column(Float, nullable=True) + alert_threshold: Mapped[float] = mapped_column(Float, nullable=False) + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_by_user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"), nullable=True, index=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now(), index=True) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + onupdate=func.now(), + ) + + target: Mapped[Target | None] = relationship(back_populates="alert_definitions") diff --git a/backend/app/schemas/alert.py b/backend/app/schemas/alert.py new file mode 100644 index 0000000..46f2a4d --- /dev/null +++ b/backend/app/schemas/alert.py @@ -0,0 +1,74 @@ +from datetime import datetime +from pydantic import BaseModel, Field + + +class AlertDefinitionBase(BaseModel): + name: str = Field(min_length=2, max_length=160) + description: str | None = None + target_id: int | None = None + sql_text: str = Field(min_length=8, max_length=4000) + comparison: str = "gte" + warning_threshold: float | None = None + alert_threshold: float + enabled: bool = True + + +class AlertDefinitionCreate(AlertDefinitionBase): + pass + + +class AlertDefinitionUpdate(BaseModel): + name: str | None = Field(default=None, min_length=2, max_length=160) + description: str | None = None + target_id: int | None = None + sql_text: str | None = Field(default=None, min_length=8, max_length=4000) + comparison: str | None = None + warning_threshold: float | None = None + alert_threshold: float | None = None + enabled: bool | None = None + + +class AlertDefinitionOut(AlertDefinitionBase): + id: int + created_by_user_id: int | None = None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class AlertDefinitionTestRequest(BaseModel): + target_id: int + sql_text: str = Field(min_length=8, max_length=4000) + + +class AlertDefinitionTestResponse(BaseModel): + ok: bool + value: float | None = None + error: str | None = None + + +class AlertStatusItem(BaseModel): + alert_key: str + source: str + severity: str + category: str + name: str + description: str + target_id: int + target_name: str + value: float | None = None + warning_threshold: float | None = None + alert_threshold: float | None = None + comparison: str = "gte" + message: str + checked_at: datetime + sql_text: str | None = None + + +class AlertStatusResponse(BaseModel): + generated_at: datetime + warnings: list[AlertStatusItem] + alerts: list[AlertStatusItem] + warning_count: int + alert_count: int diff --git a/backend/app/services/alerts.py b/backend/app/services/alerts.py new file mode 100644 index 0000000..4cf9588 --- /dev/null +++ b/backend/app/services/alerts.py @@ -0,0 +1,457 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +import re +import time + +import asyncpg +from fastapi import HTTPException +from sqlalchemy import desc, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import get_settings +from app.models.models import AlertDefinition, Metric, QueryStat, Target +from app.schemas.alert import AlertStatusItem, AlertStatusResponse +from app.services.collector import build_target_dsn + +settings = get_settings() +_ALLOWED_COMPARISONS = {"gte", "gt", "lte", "lt"} +_FORBIDDEN_SQL_WORDS = re.compile( + r"\b(insert|update|delete|alter|drop|truncate|create|grant|revoke|vacuum|analyze|copy|call|do)\b", + re.IGNORECASE, +) +_STATUS_CACHE_TTL_SECONDS = 15 +_status_cache: dict = {"expires": 0.0, "data": None} + + +@dataclass +class _RuleInput: + key: str + name: str + description: str + category: str + value: float | None + warning_threshold: float | None + alert_threshold: float | None + comparison: str = "gte" + + +def invalidate_alert_cache() -> None: + _status_cache["expires"] = 0.0 + _status_cache["data"] = None + + +def validate_alert_thresholds(comparison: str, warning_threshold: float | None, alert_threshold: float) -> None: + if comparison not in _ALLOWED_COMPARISONS: + raise HTTPException(status_code=400, detail=f"Invalid comparison. Use one of {sorted(_ALLOWED_COMPARISONS)}") + if warning_threshold is None: + return + + if comparison in {"gte", "gt"} and warning_threshold > alert_threshold: + raise HTTPException(status_code=400, detail="For gte/gt, warning_threshold must be <= alert_threshold") + if comparison in {"lte", "lt"} and warning_threshold < alert_threshold: + raise HTTPException(status_code=400, detail="For lte/lt, warning_threshold must be >= alert_threshold") + + +def validate_alert_sql(sql_text: str) -> str: + sql = sql_text.strip().rstrip(";") + lowered = sql.lower().strip() + if not lowered.startswith("select"): + raise HTTPException(status_code=400, detail="Alert SQL must start with SELECT") + if _FORBIDDEN_SQL_WORDS.search(lowered): + raise HTTPException(status_code=400, detail="Only read-only SELECT statements are allowed") + if ";" in sql: + raise HTTPException(status_code=400, detail="Only a single SQL statement is allowed") + return sql + + +async def _connect_target(target: Target, timeout_seconds: int = 5) -> asyncpg.Connection: + return await asyncpg.connect(dsn=build_target_dsn(target), timeout=timeout_seconds) + + +async def run_scalar_sql_for_target(target: Target, sql_text: str) -> float: + sql = validate_alert_sql(sql_text) + conn: asyncpg.Connection | None = None + try: + conn = await _connect_target(target, timeout_seconds=6) + row = await conn.fetchrow(sql) + if not row: + raise ValueError("Query returned no rows") + value = row[0] + return float(value) + finally: + if conn: + await conn.close() + + +def _compare(value: float, threshold: float, comparison: str) -> bool: + if comparison == "gte": + return value >= threshold + if comparison == "gt": + return value > threshold + if comparison == "lte": + return value <= threshold + if comparison == "lt": + return value < threshold + return False + + +def _severity_from_thresholds( + value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None +) -> str: + if value is None or alert_threshold is None: + return "unknown" + if _compare(value, alert_threshold, comparison): + return "alert" + if warning_threshold is not None and _compare(value, warning_threshold, comparison): + return "warning" + return "ok" + + +def _status_message(value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None) -> str: + if value is None: + return "No numeric value available" + if alert_threshold is None: + return f"Current value: {value:.2f}" + if warning_threshold is None: + return f"Current value: {value:.2f} (alert when value {comparison} {alert_threshold:.2f})" + return ( + f"Current value: {value:.2f} " + f"(warning when value {comparison} {warning_threshold:.2f}, alert when value {comparison} {alert_threshold:.2f})" + ) + + +async def _latest_metric_value(db: AsyncSession, target_id: int, metric_name: str) -> float | None: + row = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name) + .order_by(desc(Metric.ts)) + .limit(1) + ) + return float(row) if row is not None else None + + +async def _metric_delta(db: AsyncSession, target_id: int, metric_name: str, minutes: int) -> float | None: + cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) + latest = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name) + .order_by(desc(Metric.ts)) + .limit(1) + ) + oldest = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name, Metric.ts >= cutoff) + .order_by(Metric.ts.asc()) + .limit(1) + ) + if latest is None or oldest is None: + return None + return max(0.0, float(latest) - float(oldest)) + + +async def _latest_query_snapshot_max(db: AsyncSession, target_id: int, column_name: str) -> float | None: + latest_ts = await db.scalar(select(func.max(QueryStat.ts)).where(QueryStat.target_id == target_id)) + if latest_ts is None: + return None + column = QueryStat.mean_time if column_name == "mean_time" else QueryStat.total_time + value = await db.scalar( + select(func.max(column)).where(QueryStat.target_id == target_id, QueryStat.ts == latest_ts) + ) + return float(value) if value is not None else None + + +async def _build_standard_rules(db: AsyncSession, target: Target) -> tuple[list[_RuleInput], list[AlertStatusItem]]: + rules: list[_RuleInput] = [] + forced_items: list[AlertStatusItem] = [] + checked_at = datetime.now(timezone.utc) + + # 1) Connectivity with RTT threshold. + start = time.perf_counter() + conn: asyncpg.Connection | None = None + try: + conn = await _connect_target(target, timeout_seconds=4) + await conn.fetchval("SELECT 1") + connect_ms = (time.perf_counter() - start) * 1000 + rules.append( + _RuleInput( + key="connectivity_rtt_ms", + name="Connectivity Latency", + description="Checks whether the target is reachable and how long the connection handshake takes.", + category="availability", + value=connect_ms, + warning_threshold=1000, + alert_threshold=2500, + comparison="gte", + ) + ) + except Exception as exc: + forced_items.append( + AlertStatusItem( + alert_key=f"std-connectivity-down-{target.id}", + source="standard", + severity="alert", + category="availability", + name="Target Reachability", + description="Verifies that the monitored database can be reached by the collector.", + target_id=target.id, + target_name=target.name, + value=None, + warning_threshold=None, + alert_threshold=None, + comparison="gte", + message=f"Connection failed: {exc}", + checked_at=checked_at, + ) + ) + finally: + if conn: + await conn.close() + + # 2) Collector freshness based on latest stored metric. + latest_ts = await db.scalar(select(func.max(Metric.ts)).where(Metric.target_id == target.id)) + if latest_ts is None: + forced_items.append( + AlertStatusItem( + alert_key=f"std-metric-freshness-{target.id}", + source="standard", + severity="warning", + category="availability", + name="Collector Freshness", + description="Ensures fresh metrics are arriving for the target.", + target_id=target.id, + target_name=target.name, + value=None, + warning_threshold=None, + alert_threshold=None, + comparison="gte", + message="No metrics collected yet for this target.", + checked_at=checked_at, + ) + ) + else: + age_seconds = max(0.0, (checked_at - latest_ts).total_seconds()) + rules.append( + _RuleInput( + key="collector_freshness_seconds", + name="Collector Freshness", + description="Age of the most recent metric sample.", + category="availability", + value=age_seconds, + warning_threshold=float(settings.poll_interval_seconds * 2), + alert_threshold=float(settings.poll_interval_seconds * 4), + ) + ) + + active_connections = await _latest_metric_value(db, target.id, "connections_active") + total_connections = await _latest_metric_value(db, target.id, "connections_total") + cache_hit_ratio = await _latest_metric_value(db, target.id, "cache_hit_ratio") + lock_count = await _latest_metric_value(db, target.id, "locks_total") + xact_commit = await _latest_metric_value(db, target.id, "xact_commit") + xact_rollback = await _latest_metric_value(db, target.id, "xact_rollback") + checkpoints_req_delta = await _metric_delta(db, target.id, "checkpoints_req", minutes=15) + deadlocks_delta = await _metric_delta(db, target.id, "deadlocks", minutes=60) + slowest_query_mean = await _latest_query_snapshot_max(db, target.id, "mean_time") + slowest_query_total = await _latest_query_snapshot_max(db, target.id, "total_time") + + active_ratio = None + if active_connections is not None and total_connections and total_connections > 0: + active_ratio = active_connections / total_connections + + rollback_ratio = None + if xact_commit is not None and xact_rollback is not None: + tx_total = xact_commit + xact_rollback + if tx_total > 0: + rollback_ratio = xact_rollback / tx_total + + rules.extend( + [ + _RuleInput( + key="active_connections_ratio", + name="Active Connection Ratio", + description="Share of active sessions over total sessions.", + category="capacity", + value=active_ratio, + warning_threshold=0.70, + alert_threshold=0.90, + ), + _RuleInput( + key="cache_hit_ratio_low", + name="Cache Hit Ratio", + description="Low cache hit ratio means increased disk reads and slower queries.", + category="performance", + value=cache_hit_ratio, + warning_threshold=0.95, + alert_threshold=0.90, + comparison="lte", + ), + _RuleInput( + key="locks_total", + name="Lock Pressure", + description="Number of locks currently held on the target.", + category="contention", + value=lock_count, + warning_threshold=50, + alert_threshold=100, + ), + _RuleInput( + key="checkpoints_req_15m", + name="Checkpoint Pressure (15m)", + description="Increase of requested checkpoints in the last 15 minutes.", + category="io", + value=checkpoints_req_delta, + warning_threshold=5, + alert_threshold=15, + ), + _RuleInput( + key="rollback_ratio", + name="Rollback Ratio", + description="Fraction of rolled back transactions over all transactions.", + category="transactions", + value=rollback_ratio, + warning_threshold=0.05, + alert_threshold=0.15, + ), + _RuleInput( + key="deadlocks_60m", + name="Deadlocks (60m)", + description="Increase in deadlocks during the last hour.", + category="contention", + value=deadlocks_delta, + warning_threshold=1, + alert_threshold=5, + ), + _RuleInput( + key="slowest_query_mean_ms", + name="Slowest Query Mean Time", + description="Highest mean execution time in the latest query snapshot.", + category="query", + value=slowest_query_mean, + warning_threshold=300, + alert_threshold=1000, + ), + _RuleInput( + key="slowest_query_total_ms", + name="Slowest Query Total Time", + description="Highest total execution time in the latest query snapshot.", + category="query", + value=slowest_query_total, + warning_threshold=3000, + alert_threshold=10000, + ), + ] + ) + return rules, forced_items + + +async def _evaluate_standard_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]: + checked_at = datetime.now(timezone.utc) + items: list[AlertStatusItem] = [] + + for target in targets: + rules, forced_items = await _build_standard_rules(db, target) + items.extend(forced_items) + for rule in rules: + severity = _severity_from_thresholds(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold) + if severity not in {"warning", "alert"}: + continue + items.append( + AlertStatusItem( + alert_key=f"std-{rule.key}-{target.id}", + source="standard", + severity=severity, + category=rule.category, + name=rule.name, + description=rule.description, + target_id=target.id, + target_name=target.name, + value=rule.value, + warning_threshold=rule.warning_threshold, + alert_threshold=rule.alert_threshold, + comparison=rule.comparison, + message=_status_message(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold), + checked_at=checked_at, + ) + ) + return items + + +async def _evaluate_custom_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]: + checked_at = datetime.now(timezone.utc) + defs = ( + await db.scalars(select(AlertDefinition).where(AlertDefinition.enabled.is_(True)).order_by(desc(AlertDefinition.id))) + ).all() + target_by_id = {t.id: t for t in targets} + items: list[AlertStatusItem] = [] + + for definition in defs: + target_candidates = targets if definition.target_id is None else [target_by_id.get(definition.target_id)] + for target in [t for t in target_candidates if t is not None]: + value: float | None = None + severity = "unknown" + message = "No data" + try: + value = await run_scalar_sql_for_target(target, definition.sql_text) + severity = _severity_from_thresholds( + value=value, + comparison=definition.comparison, + warning_threshold=definition.warning_threshold, + alert_threshold=definition.alert_threshold, + ) + message = _status_message(value, definition.comparison, definition.warning_threshold, definition.alert_threshold) + except Exception as exc: + severity = "alert" + message = f"Execution failed: {exc}" + + if severity not in {"warning", "alert"}: + continue + items.append( + AlertStatusItem( + alert_key=f"custom-{definition.id}-{target.id}", + source="custom", + severity=severity, + category="custom", + name=definition.name, + description=definition.description or "Custom SQL alert", + target_id=target.id, + target_name=target.name, + value=value, + warning_threshold=definition.warning_threshold, + alert_threshold=definition.alert_threshold, + comparison=definition.comparison, + message=message, + checked_at=checked_at, + sql_text=definition.sql_text, + ) + ) + return items + + +async def get_alert_status(db: AsyncSession, use_cache: bool = True) -> AlertStatusResponse: + now_seconds = time.time() + cached = _status_cache.get("data") + expires = float(_status_cache.get("expires", 0.0)) + if use_cache and cached and expires > now_seconds: + return cached + + targets = (await db.scalars(select(Target).order_by(Target.name.asc()))).all() + standard_items = await _evaluate_standard_alerts(db, targets) + custom_items = await _evaluate_custom_alerts(db, targets) + all_items = standard_items + custom_items + + warnings = [item for item in all_items if item.severity == "warning"] + alerts = [item for item in all_items if item.severity == "alert"] + warnings.sort(key=lambda i: (i.target_name.lower(), i.name.lower())) + alerts.sort(key=lambda i: (i.target_name.lower(), i.name.lower())) + + payload = AlertStatusResponse( + generated_at=datetime.now(timezone.utc), + warnings=warnings, + alerts=alerts, + warning_count=len(warnings), + alert_count=len(alerts), + ) + _status_cache["data"] = payload + _status_cache["expires"] = now_seconds + _STATUS_CACHE_TTL_SECONDS + return payload diff --git a/backend/app/services/collector.py b/backend/app/services/collector.py index 88b81e2..e65442c 100644 --- a/backend/app/services/collector.py +++ b/backend/app/services/collector.py @@ -41,7 +41,19 @@ async def collect_target(target: Target) -> None: try: stat_db = await conn.fetchrow( """ - SELECT numbackends, xact_commit, xact_rollback, blks_hit, blks_read, tup_returned, tup_fetched + SELECT + numbackends, + xact_commit, + xact_rollback, + deadlocks, + temp_files, + temp_bytes, + blk_read_time, + blk_write_time, + blks_hit, + blks_read, + tup_returned, + tup_fetched FROM pg_stat_database WHERE datname = current_database() """ @@ -67,6 +79,11 @@ async def collect_target(target: Target) -> None: "numbackends": 0, "xact_commit": 0, "xact_rollback": 0, + "deadlocks": 0, + "temp_files": 0, + "temp_bytes": 0, + "blk_read_time": 0, + "blk_write_time": 0, "blks_hit": 0, "blks_read": 0, "tup_returned": 0, @@ -106,6 +123,13 @@ async def collect_target(target: Target) -> None: await _store_metric(db, target.id, "connections_total", activity["total_connections"], {}) await _store_metric(db, target.id, "connections_active", activity["active_connections"], {}) await _store_metric(db, target.id, "xacts_total", stat_db["xact_commit"] + stat_db["xact_rollback"], {}) + await _store_metric(db, target.id, "xact_commit", stat_db["xact_commit"], {}) + await _store_metric(db, target.id, "xact_rollback", stat_db["xact_rollback"], {}) + await _store_metric(db, target.id, "deadlocks", stat_db["deadlocks"], {}) + await _store_metric(db, target.id, "temp_files", stat_db["temp_files"], {}) + await _store_metric(db, target.id, "temp_bytes", stat_db["temp_bytes"], {}) + await _store_metric(db, target.id, "blk_read_time", stat_db["blk_read_time"], {}) + await _store_metric(db, target.id, "blk_write_time", stat_db["blk_write_time"], {}) await _store_metric(db, target.id, "cache_hit_ratio", cache_hit_ratio, {}) await _store_metric(db, target.id, "locks_total", lock_count, {}) await _store_metric(db, target.id, "checkpoints_timed", bgwriter["checkpoints_timed"], {}) diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index c3048ea..1487b01 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -6,6 +6,7 @@ import { DashboardPage } from "./pages/DashboardPage"; import { TargetsPage } from "./pages/TargetsPage"; import { TargetDetailPage } from "./pages/TargetDetailPage"; import { QueryInsightsPage } from "./pages/QueryInsightsPage"; +import { AlertsPage } from "./pages/AlertsPage"; import { AdminUsersPage } from "./pages/AdminUsersPage"; function Protected({ children }) { @@ -51,6 +52,14 @@ function Layout({ children }) { Query Insights + + + Alerts + {me?.role === "admin" && (