diff --git a/backend/alembic/versions/0002_alert_definitions.py b/backend/alembic/versions/0002_alert_definitions.py new file mode 100644 index 0000000..fd8a07c --- /dev/null +++ b/backend/alembic/versions/0002_alert_definitions.py @@ -0,0 +1,43 @@ +"""add alert definitions + +Revision ID: 0002_alert_definitions +Revises: 0001_init +Create Date: 2026-02-12 +""" + +from alembic import op +import sqlalchemy as sa + + +revision = "0002_alert_definitions" +down_revision = "0001_init" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "alert_definitions", + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column("name", sa.String(length=160), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("target_id", sa.Integer(), sa.ForeignKey("targets.id", ondelete="CASCADE"), nullable=True), + sa.Column("sql_text", sa.Text(), nullable=False), + sa.Column("comparison", sa.String(length=10), nullable=False, server_default="gte"), + sa.Column("warning_threshold", sa.Float(), nullable=True), + sa.Column("alert_threshold", sa.Float(), nullable=False), + sa.Column("enabled", sa.Boolean(), nullable=False, server_default=sa.text("true")), + sa.Column("created_by_user_id", sa.Integer(), sa.ForeignKey("users.id"), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("ix_alert_definitions_target_id", "alert_definitions", ["target_id"]) + op.create_index("ix_alert_definitions_created_by_user_id", "alert_definitions", ["created_by_user_id"]) + op.create_index("ix_alert_definitions_created_at", "alert_definitions", ["created_at"]) + + +def downgrade() -> None: + op.drop_index("ix_alert_definitions_created_at", table_name="alert_definitions") + op.drop_index("ix_alert_definitions_created_by_user_id", table_name="alert_definitions") + op.drop_index("ix_alert_definitions_target_id", table_name="alert_definitions") + op.drop_table("alert_definitions") diff --git a/backend/app/api/router.py b/backend/app/api/router.py index e30229e..25d2953 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -1,9 +1,10 @@ from fastapi import APIRouter -from app.api.routes import admin_users, auth, health, me, targets +from app.api.routes import admin_users, alerts, auth, health, me, targets api_router = APIRouter() api_router.include_router(health.router, tags=["health"]) api_router.include_router(auth.router, prefix="/auth", tags=["auth"]) api_router.include_router(me.router, tags=["auth"]) api_router.include_router(targets.router, prefix="/targets", tags=["targets"]) +api_router.include_router(alerts.router, prefix="/alerts", tags=["alerts"]) api_router.include_router(admin_users.router, prefix="/admin/users", tags=["admin"]) diff --git a/backend/app/api/routes/alerts.py b/backend/app/api/routes/alerts.py new file mode 100644 index 0000000..75f45c7 --- /dev/null +++ b/backend/app/api/routes/alerts.py @@ -0,0 +1,143 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.db import get_db +from app.core.deps import get_current_user, require_roles +from app.models.models import AlertDefinition, Target, User +from app.schemas.alert import ( + AlertDefinitionCreate, + AlertDefinitionOut, + AlertDefinitionTestRequest, + AlertDefinitionTestResponse, + AlertDefinitionUpdate, + AlertStatusResponse, +) +from app.services.alerts import ( + get_alert_status, + invalidate_alert_cache, + run_scalar_sql_for_target, + validate_alert_sql, + validate_alert_thresholds, +) +from app.services.audit import write_audit_log + +router = APIRouter() + + +async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> None: + if target_id is None: + return + target_exists = await db.scalar(select(Target.id).where(Target.id == target_id)) + if target_exists is None: + raise HTTPException(status_code=404, detail="Target not found") + + +@router.get("/status", response_model=AlertStatusResponse) +async def list_alert_status( + user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) +) -> AlertStatusResponse: + _ = user + return await get_alert_status(db, use_cache=True) + + +@router.get("/definitions", response_model=list[AlertDefinitionOut]) +async def list_alert_definitions( + user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) +) -> list[AlertDefinitionOut]: + _ = user + defs = (await db.scalars(select(AlertDefinition).order_by(AlertDefinition.id.desc()))).all() + return [AlertDefinitionOut.model_validate(item) for item in defs] + + +@router.post("/definitions", response_model=AlertDefinitionOut, status_code=status.HTTP_201_CREATED) +async def create_alert_definition( + payload: AlertDefinitionCreate, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionOut: + await _validate_target_exists(db, payload.target_id) + sql_text = validate_alert_sql(payload.sql_text) + validate_alert_thresholds(payload.comparison, payload.warning_threshold, payload.alert_threshold) + + definition = AlertDefinition( + name=payload.name, + description=payload.description, + target_id=payload.target_id, + sql_text=sql_text, + comparison=payload.comparison, + warning_threshold=payload.warning_threshold, + alert_threshold=payload.alert_threshold, + enabled=payload.enabled, + created_by_user_id=user.id, + ) + db.add(definition) + await db.commit() + await db.refresh(definition) + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.create", user.id, {"alert_definition_id": definition.id, "name": definition.name}) + return AlertDefinitionOut.model_validate(definition) + + +@router.put("/definitions/{definition_id}", response_model=AlertDefinitionOut) +async def update_alert_definition( + definition_id: int, + payload: AlertDefinitionUpdate, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionOut: + definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) + if definition is None: + raise HTTPException(status_code=404, detail="Alert definition not found") + + updates = payload.model_dump(exclude_unset=True) + if "target_id" in updates: + await _validate_target_exists(db, updates["target_id"]) + if "sql_text" in updates and updates["sql_text"] is not None: + updates["sql_text"] = validate_alert_sql(updates["sql_text"]) + + comparison = updates.get("comparison", definition.comparison) + warning_threshold = updates.get("warning_threshold", definition.warning_threshold) + alert_threshold = updates.get("alert_threshold", definition.alert_threshold) + validate_alert_thresholds(comparison, warning_threshold, alert_threshold) + + for key, value in updates.items(): + setattr(definition, key, value) + await db.commit() + await db.refresh(definition) + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.update", user.id, {"alert_definition_id": definition.id}) + return AlertDefinitionOut.model_validate(definition) + + +@router.delete("/definitions/{definition_id}") +async def delete_alert_definition( + definition_id: int, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> dict: + definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) + if definition is None: + raise HTTPException(status_code=404, detail="Alert definition not found") + await db.delete(definition) + await db.commit() + invalidate_alert_cache() + await write_audit_log(db, "alert.definition.delete", user.id, {"alert_definition_id": definition_id}) + return {"status": "deleted"} + + +@router.post("/definitions/test", response_model=AlertDefinitionTestResponse) +async def test_alert_definition( + payload: AlertDefinitionTestRequest, + user: User = Depends(require_roles("admin", "operator")), + db: AsyncSession = Depends(get_db), +) -> AlertDefinitionTestResponse: + _ = user + target = await db.scalar(select(Target).where(Target.id == payload.target_id)) + if target is None: + raise HTTPException(status_code=404, detail="Target not found") + try: + value = await run_scalar_sql_for_target(target, payload.sql_text) + return AlertDefinitionTestResponse(ok=True, value=value) + except Exception as exc: + return AlertDefinitionTestResponse(ok=False, error=str(exc)) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 6ea25b6..9f1dd1a 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,3 +1,3 @@ -from app.models.models import AuditLog, Metric, QueryStat, Target, User +from app.models.models import AlertDefinition, AuditLog, Metric, QueryStat, Target, User -__all__ = ["User", "Target", "Metric", "QueryStat", "AuditLog"] +__all__ = ["User", "Target", "Metric", "QueryStat", "AuditLog", "AlertDefinition"] diff --git a/backend/app/models/models.py b/backend/app/models/models.py index ce90997..3542939 100644 --- a/backend/app/models/models.py +++ b/backend/app/models/models.py @@ -1,5 +1,5 @@ from datetime import datetime -from sqlalchemy import JSON, DateTime, Float, ForeignKey, Integer, String, Text, func +from sqlalchemy import JSON, Boolean, DateTime, Float, ForeignKey, Integer, String, Text, func from sqlalchemy.orm import Mapped, mapped_column, relationship from app.core.db import Base @@ -32,6 +32,7 @@ class Target(Base): metrics: Mapped[list["Metric"]] = relationship(back_populates="target", cascade="all, delete-orphan") query_stats: Mapped[list["QueryStat"]] = relationship(back_populates="target", cascade="all, delete-orphan") + alert_definitions: Mapped[list["AlertDefinition"]] = relationship(back_populates="target", cascade="all, delete-orphan") class Metric(Base): @@ -73,3 +74,27 @@ class AuditLog(Base): payload: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict) user: Mapped[User | None] = relationship(back_populates="audit_logs") + + +class AlertDefinition(Base): + __tablename__ = "alert_definitions" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + name: Mapped[str] = mapped_column(String(160), nullable=False) + description: Mapped[str | None] = mapped_column(Text, nullable=True) + target_id: Mapped[int | None] = mapped_column(ForeignKey("targets.id", ondelete="CASCADE"), nullable=True, index=True) + sql_text: Mapped[str] = mapped_column(Text, nullable=False) + comparison: Mapped[str] = mapped_column(String(10), nullable=False, default="gte") + warning_threshold: Mapped[float | None] = mapped_column(Float, nullable=True) + alert_threshold: Mapped[float] = mapped_column(Float, nullable=False) + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_by_user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"), nullable=True, index=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now(), index=True) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + onupdate=func.now(), + ) + + target: Mapped[Target | None] = relationship(back_populates="alert_definitions") diff --git a/backend/app/schemas/alert.py b/backend/app/schemas/alert.py new file mode 100644 index 0000000..46f2a4d --- /dev/null +++ b/backend/app/schemas/alert.py @@ -0,0 +1,74 @@ +from datetime import datetime +from pydantic import BaseModel, Field + + +class AlertDefinitionBase(BaseModel): + name: str = Field(min_length=2, max_length=160) + description: str | None = None + target_id: int | None = None + sql_text: str = Field(min_length=8, max_length=4000) + comparison: str = "gte" + warning_threshold: float | None = None + alert_threshold: float + enabled: bool = True + + +class AlertDefinitionCreate(AlertDefinitionBase): + pass + + +class AlertDefinitionUpdate(BaseModel): + name: str | None = Field(default=None, min_length=2, max_length=160) + description: str | None = None + target_id: int | None = None + sql_text: str | None = Field(default=None, min_length=8, max_length=4000) + comparison: str | None = None + warning_threshold: float | None = None + alert_threshold: float | None = None + enabled: bool | None = None + + +class AlertDefinitionOut(AlertDefinitionBase): + id: int + created_by_user_id: int | None = None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class AlertDefinitionTestRequest(BaseModel): + target_id: int + sql_text: str = Field(min_length=8, max_length=4000) + + +class AlertDefinitionTestResponse(BaseModel): + ok: bool + value: float | None = None + error: str | None = None + + +class AlertStatusItem(BaseModel): + alert_key: str + source: str + severity: str + category: str + name: str + description: str + target_id: int + target_name: str + value: float | None = None + warning_threshold: float | None = None + alert_threshold: float | None = None + comparison: str = "gte" + message: str + checked_at: datetime + sql_text: str | None = None + + +class AlertStatusResponse(BaseModel): + generated_at: datetime + warnings: list[AlertStatusItem] + alerts: list[AlertStatusItem] + warning_count: int + alert_count: int diff --git a/backend/app/services/alerts.py b/backend/app/services/alerts.py new file mode 100644 index 0000000..4cf9588 --- /dev/null +++ b/backend/app/services/alerts.py @@ -0,0 +1,457 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +import re +import time + +import asyncpg +from fastapi import HTTPException +from sqlalchemy import desc, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import get_settings +from app.models.models import AlertDefinition, Metric, QueryStat, Target +from app.schemas.alert import AlertStatusItem, AlertStatusResponse +from app.services.collector import build_target_dsn + +settings = get_settings() +_ALLOWED_COMPARISONS = {"gte", "gt", "lte", "lt"} +_FORBIDDEN_SQL_WORDS = re.compile( + r"\b(insert|update|delete|alter|drop|truncate|create|grant|revoke|vacuum|analyze|copy|call|do)\b", + re.IGNORECASE, +) +_STATUS_CACHE_TTL_SECONDS = 15 +_status_cache: dict = {"expires": 0.0, "data": None} + + +@dataclass +class _RuleInput: + key: str + name: str + description: str + category: str + value: float | None + warning_threshold: float | None + alert_threshold: float | None + comparison: str = "gte" + + +def invalidate_alert_cache() -> None: + _status_cache["expires"] = 0.0 + _status_cache["data"] = None + + +def validate_alert_thresholds(comparison: str, warning_threshold: float | None, alert_threshold: float) -> None: + if comparison not in _ALLOWED_COMPARISONS: + raise HTTPException(status_code=400, detail=f"Invalid comparison. Use one of {sorted(_ALLOWED_COMPARISONS)}") + if warning_threshold is None: + return + + if comparison in {"gte", "gt"} and warning_threshold > alert_threshold: + raise HTTPException(status_code=400, detail="For gte/gt, warning_threshold must be <= alert_threshold") + if comparison in {"lte", "lt"} and warning_threshold < alert_threshold: + raise HTTPException(status_code=400, detail="For lte/lt, warning_threshold must be >= alert_threshold") + + +def validate_alert_sql(sql_text: str) -> str: + sql = sql_text.strip().rstrip(";") + lowered = sql.lower().strip() + if not lowered.startswith("select"): + raise HTTPException(status_code=400, detail="Alert SQL must start with SELECT") + if _FORBIDDEN_SQL_WORDS.search(lowered): + raise HTTPException(status_code=400, detail="Only read-only SELECT statements are allowed") + if ";" in sql: + raise HTTPException(status_code=400, detail="Only a single SQL statement is allowed") + return sql + + +async def _connect_target(target: Target, timeout_seconds: int = 5) -> asyncpg.Connection: + return await asyncpg.connect(dsn=build_target_dsn(target), timeout=timeout_seconds) + + +async def run_scalar_sql_for_target(target: Target, sql_text: str) -> float: + sql = validate_alert_sql(sql_text) + conn: asyncpg.Connection | None = None + try: + conn = await _connect_target(target, timeout_seconds=6) + row = await conn.fetchrow(sql) + if not row: + raise ValueError("Query returned no rows") + value = row[0] + return float(value) + finally: + if conn: + await conn.close() + + +def _compare(value: float, threshold: float, comparison: str) -> bool: + if comparison == "gte": + return value >= threshold + if comparison == "gt": + return value > threshold + if comparison == "lte": + return value <= threshold + if comparison == "lt": + return value < threshold + return False + + +def _severity_from_thresholds( + value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None +) -> str: + if value is None or alert_threshold is None: + return "unknown" + if _compare(value, alert_threshold, comparison): + return "alert" + if warning_threshold is not None and _compare(value, warning_threshold, comparison): + return "warning" + return "ok" + + +def _status_message(value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None) -> str: + if value is None: + return "No numeric value available" + if alert_threshold is None: + return f"Current value: {value:.2f}" + if warning_threshold is None: + return f"Current value: {value:.2f} (alert when value {comparison} {alert_threshold:.2f})" + return ( + f"Current value: {value:.2f} " + f"(warning when value {comparison} {warning_threshold:.2f}, alert when value {comparison} {alert_threshold:.2f})" + ) + + +async def _latest_metric_value(db: AsyncSession, target_id: int, metric_name: str) -> float | None: + row = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name) + .order_by(desc(Metric.ts)) + .limit(1) + ) + return float(row) if row is not None else None + + +async def _metric_delta(db: AsyncSession, target_id: int, metric_name: str, minutes: int) -> float | None: + cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) + latest = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name) + .order_by(desc(Metric.ts)) + .limit(1) + ) + oldest = await db.scalar( + select(Metric.value) + .where(Metric.target_id == target_id, Metric.metric_name == metric_name, Metric.ts >= cutoff) + .order_by(Metric.ts.asc()) + .limit(1) + ) + if latest is None or oldest is None: + return None + return max(0.0, float(latest) - float(oldest)) + + +async def _latest_query_snapshot_max(db: AsyncSession, target_id: int, column_name: str) -> float | None: + latest_ts = await db.scalar(select(func.max(QueryStat.ts)).where(QueryStat.target_id == target_id)) + if latest_ts is None: + return None + column = QueryStat.mean_time if column_name == "mean_time" else QueryStat.total_time + value = await db.scalar( + select(func.max(column)).where(QueryStat.target_id == target_id, QueryStat.ts == latest_ts) + ) + return float(value) if value is not None else None + + +async def _build_standard_rules(db: AsyncSession, target: Target) -> tuple[list[_RuleInput], list[AlertStatusItem]]: + rules: list[_RuleInput] = [] + forced_items: list[AlertStatusItem] = [] + checked_at = datetime.now(timezone.utc) + + # 1) Connectivity with RTT threshold. + start = time.perf_counter() + conn: asyncpg.Connection | None = None + try: + conn = await _connect_target(target, timeout_seconds=4) + await conn.fetchval("SELECT 1") + connect_ms = (time.perf_counter() - start) * 1000 + rules.append( + _RuleInput( + key="connectivity_rtt_ms", + name="Connectivity Latency", + description="Checks whether the target is reachable and how long the connection handshake takes.", + category="availability", + value=connect_ms, + warning_threshold=1000, + alert_threshold=2500, + comparison="gte", + ) + ) + except Exception as exc: + forced_items.append( + AlertStatusItem( + alert_key=f"std-connectivity-down-{target.id}", + source="standard", + severity="alert", + category="availability", + name="Target Reachability", + description="Verifies that the monitored database can be reached by the collector.", + target_id=target.id, + target_name=target.name, + value=None, + warning_threshold=None, + alert_threshold=None, + comparison="gte", + message=f"Connection failed: {exc}", + checked_at=checked_at, + ) + ) + finally: + if conn: + await conn.close() + + # 2) Collector freshness based on latest stored metric. + latest_ts = await db.scalar(select(func.max(Metric.ts)).where(Metric.target_id == target.id)) + if latest_ts is None: + forced_items.append( + AlertStatusItem( + alert_key=f"std-metric-freshness-{target.id}", + source="standard", + severity="warning", + category="availability", + name="Collector Freshness", + description="Ensures fresh metrics are arriving for the target.", + target_id=target.id, + target_name=target.name, + value=None, + warning_threshold=None, + alert_threshold=None, + comparison="gte", + message="No metrics collected yet for this target.", + checked_at=checked_at, + ) + ) + else: + age_seconds = max(0.0, (checked_at - latest_ts).total_seconds()) + rules.append( + _RuleInput( + key="collector_freshness_seconds", + name="Collector Freshness", + description="Age of the most recent metric sample.", + category="availability", + value=age_seconds, + warning_threshold=float(settings.poll_interval_seconds * 2), + alert_threshold=float(settings.poll_interval_seconds * 4), + ) + ) + + active_connections = await _latest_metric_value(db, target.id, "connections_active") + total_connections = await _latest_metric_value(db, target.id, "connections_total") + cache_hit_ratio = await _latest_metric_value(db, target.id, "cache_hit_ratio") + lock_count = await _latest_metric_value(db, target.id, "locks_total") + xact_commit = await _latest_metric_value(db, target.id, "xact_commit") + xact_rollback = await _latest_metric_value(db, target.id, "xact_rollback") + checkpoints_req_delta = await _metric_delta(db, target.id, "checkpoints_req", minutes=15) + deadlocks_delta = await _metric_delta(db, target.id, "deadlocks", minutes=60) + slowest_query_mean = await _latest_query_snapshot_max(db, target.id, "mean_time") + slowest_query_total = await _latest_query_snapshot_max(db, target.id, "total_time") + + active_ratio = None + if active_connections is not None and total_connections and total_connections > 0: + active_ratio = active_connections / total_connections + + rollback_ratio = None + if xact_commit is not None and xact_rollback is not None: + tx_total = xact_commit + xact_rollback + if tx_total > 0: + rollback_ratio = xact_rollback / tx_total + + rules.extend( + [ + _RuleInput( + key="active_connections_ratio", + name="Active Connection Ratio", + description="Share of active sessions over total sessions.", + category="capacity", + value=active_ratio, + warning_threshold=0.70, + alert_threshold=0.90, + ), + _RuleInput( + key="cache_hit_ratio_low", + name="Cache Hit Ratio", + description="Low cache hit ratio means increased disk reads and slower queries.", + category="performance", + value=cache_hit_ratio, + warning_threshold=0.95, + alert_threshold=0.90, + comparison="lte", + ), + _RuleInput( + key="locks_total", + name="Lock Pressure", + description="Number of locks currently held on the target.", + category="contention", + value=lock_count, + warning_threshold=50, + alert_threshold=100, + ), + _RuleInput( + key="checkpoints_req_15m", + name="Checkpoint Pressure (15m)", + description="Increase of requested checkpoints in the last 15 minutes.", + category="io", + value=checkpoints_req_delta, + warning_threshold=5, + alert_threshold=15, + ), + _RuleInput( + key="rollback_ratio", + name="Rollback Ratio", + description="Fraction of rolled back transactions over all transactions.", + category="transactions", + value=rollback_ratio, + warning_threshold=0.05, + alert_threshold=0.15, + ), + _RuleInput( + key="deadlocks_60m", + name="Deadlocks (60m)", + description="Increase in deadlocks during the last hour.", + category="contention", + value=deadlocks_delta, + warning_threshold=1, + alert_threshold=5, + ), + _RuleInput( + key="slowest_query_mean_ms", + name="Slowest Query Mean Time", + description="Highest mean execution time in the latest query snapshot.", + category="query", + value=slowest_query_mean, + warning_threshold=300, + alert_threshold=1000, + ), + _RuleInput( + key="slowest_query_total_ms", + name="Slowest Query Total Time", + description="Highest total execution time in the latest query snapshot.", + category="query", + value=slowest_query_total, + warning_threshold=3000, + alert_threshold=10000, + ), + ] + ) + return rules, forced_items + + +async def _evaluate_standard_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]: + checked_at = datetime.now(timezone.utc) + items: list[AlertStatusItem] = [] + + for target in targets: + rules, forced_items = await _build_standard_rules(db, target) + items.extend(forced_items) + for rule in rules: + severity = _severity_from_thresholds(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold) + if severity not in {"warning", "alert"}: + continue + items.append( + AlertStatusItem( + alert_key=f"std-{rule.key}-{target.id}", + source="standard", + severity=severity, + category=rule.category, + name=rule.name, + description=rule.description, + target_id=target.id, + target_name=target.name, + value=rule.value, + warning_threshold=rule.warning_threshold, + alert_threshold=rule.alert_threshold, + comparison=rule.comparison, + message=_status_message(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold), + checked_at=checked_at, + ) + ) + return items + + +async def _evaluate_custom_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]: + checked_at = datetime.now(timezone.utc) + defs = ( + await db.scalars(select(AlertDefinition).where(AlertDefinition.enabled.is_(True)).order_by(desc(AlertDefinition.id))) + ).all() + target_by_id = {t.id: t for t in targets} + items: list[AlertStatusItem] = [] + + for definition in defs: + target_candidates = targets if definition.target_id is None else [target_by_id.get(definition.target_id)] + for target in [t for t in target_candidates if t is not None]: + value: float | None = None + severity = "unknown" + message = "No data" + try: + value = await run_scalar_sql_for_target(target, definition.sql_text) + severity = _severity_from_thresholds( + value=value, + comparison=definition.comparison, + warning_threshold=definition.warning_threshold, + alert_threshold=definition.alert_threshold, + ) + message = _status_message(value, definition.comparison, definition.warning_threshold, definition.alert_threshold) + except Exception as exc: + severity = "alert" + message = f"Execution failed: {exc}" + + if severity not in {"warning", "alert"}: + continue + items.append( + AlertStatusItem( + alert_key=f"custom-{definition.id}-{target.id}", + source="custom", + severity=severity, + category="custom", + name=definition.name, + description=definition.description or "Custom SQL alert", + target_id=target.id, + target_name=target.name, + value=value, + warning_threshold=definition.warning_threshold, + alert_threshold=definition.alert_threshold, + comparison=definition.comparison, + message=message, + checked_at=checked_at, + sql_text=definition.sql_text, + ) + ) + return items + + +async def get_alert_status(db: AsyncSession, use_cache: bool = True) -> AlertStatusResponse: + now_seconds = time.time() + cached = _status_cache.get("data") + expires = float(_status_cache.get("expires", 0.0)) + if use_cache and cached and expires > now_seconds: + return cached + + targets = (await db.scalars(select(Target).order_by(Target.name.asc()))).all() + standard_items = await _evaluate_standard_alerts(db, targets) + custom_items = await _evaluate_custom_alerts(db, targets) + all_items = standard_items + custom_items + + warnings = [item for item in all_items if item.severity == "warning"] + alerts = [item for item in all_items if item.severity == "alert"] + warnings.sort(key=lambda i: (i.target_name.lower(), i.name.lower())) + alerts.sort(key=lambda i: (i.target_name.lower(), i.name.lower())) + + payload = AlertStatusResponse( + generated_at=datetime.now(timezone.utc), + warnings=warnings, + alerts=alerts, + warning_count=len(warnings), + alert_count=len(alerts), + ) + _status_cache["data"] = payload + _status_cache["expires"] = now_seconds + _STATUS_CACHE_TTL_SECONDS + return payload diff --git a/backend/app/services/collector.py b/backend/app/services/collector.py index 88b81e2..e65442c 100644 --- a/backend/app/services/collector.py +++ b/backend/app/services/collector.py @@ -41,7 +41,19 @@ async def collect_target(target: Target) -> None: try: stat_db = await conn.fetchrow( """ - SELECT numbackends, xact_commit, xact_rollback, blks_hit, blks_read, tup_returned, tup_fetched + SELECT + numbackends, + xact_commit, + xact_rollback, + deadlocks, + temp_files, + temp_bytes, + blk_read_time, + blk_write_time, + blks_hit, + blks_read, + tup_returned, + tup_fetched FROM pg_stat_database WHERE datname = current_database() """ @@ -67,6 +79,11 @@ async def collect_target(target: Target) -> None: "numbackends": 0, "xact_commit": 0, "xact_rollback": 0, + "deadlocks": 0, + "temp_files": 0, + "temp_bytes": 0, + "blk_read_time": 0, + "blk_write_time": 0, "blks_hit": 0, "blks_read": 0, "tup_returned": 0, @@ -106,6 +123,13 @@ async def collect_target(target: Target) -> None: await _store_metric(db, target.id, "connections_total", activity["total_connections"], {}) await _store_metric(db, target.id, "connections_active", activity["active_connections"], {}) await _store_metric(db, target.id, "xacts_total", stat_db["xact_commit"] + stat_db["xact_rollback"], {}) + await _store_metric(db, target.id, "xact_commit", stat_db["xact_commit"], {}) + await _store_metric(db, target.id, "xact_rollback", stat_db["xact_rollback"], {}) + await _store_metric(db, target.id, "deadlocks", stat_db["deadlocks"], {}) + await _store_metric(db, target.id, "temp_files", stat_db["temp_files"], {}) + await _store_metric(db, target.id, "temp_bytes", stat_db["temp_bytes"], {}) + await _store_metric(db, target.id, "blk_read_time", stat_db["blk_read_time"], {}) + await _store_metric(db, target.id, "blk_write_time", stat_db["blk_write_time"], {}) await _store_metric(db, target.id, "cache_hit_ratio", cache_hit_ratio, {}) await _store_metric(db, target.id, "locks_total", lock_count, {}) await _store_metric(db, target.id, "checkpoints_timed", bgwriter["checkpoints_timed"], {}) diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index c3048ea..1487b01 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -6,6 +6,7 @@ import { DashboardPage } from "./pages/DashboardPage"; import { TargetsPage } from "./pages/TargetsPage"; import { TargetDetailPage } from "./pages/TargetDetailPage"; import { QueryInsightsPage } from "./pages/QueryInsightsPage"; +import { AlertsPage } from "./pages/AlertsPage"; import { AdminUsersPage } from "./pages/AdminUsersPage"; function Protected({ children }) { @@ -51,6 +52,14 @@ function Layout({ children }) { Query Insights + + + + + + + Alerts + {me?.role === "admin" && ( @@ -99,6 +108,7 @@ export function App() { } /> } /> } /> + } /> } /> diff --git a/frontend/src/pages/AlertsPage.jsx b/frontend/src/pages/AlertsPage.jsx new file mode 100644 index 0000000..cb6ceaa --- /dev/null +++ b/frontend/src/pages/AlertsPage.jsx @@ -0,0 +1,333 @@ +import React, { useEffect, useMemo, useState } from "react"; +import { apiFetch } from "../api"; +import { useAuth } from "../state"; + +const initialForm = { + name: "", + description: "", + target_id: "", + sql_text: "SELECT count(*)::float FROM pg_stat_activity WHERE state = 'active'", + comparison: "gte", + warning_threshold: "", + alert_threshold: "", + enabled: true, +}; + +function formatAlertValue(value) { + if (value === null || value === undefined) return "-"; + if (Number.isInteger(value)) return String(value); + return Number(value).toFixed(2); +} + +export function AlertsPage() { + const { tokens, refresh, me } = useAuth(); + const [targets, setTargets] = useState([]); + const [status, setStatus] = useState({ warnings: [], alerts: [], warning_count: 0, alert_count: 0 }); + const [definitions, setDefinitions] = useState([]); + const [form, setForm] = useState(initialForm); + const [error, setError] = useState(""); + const [loading, setLoading] = useState(true); + const [testing, setTesting] = useState(false); + const [testResult, setTestResult] = useState(""); + const [saving, setSaving] = useState(false); + + const canManageAlerts = me?.role === "admin" || me?.role === "operator"; + + const loadAll = async () => { + try { + setError(""); + const [targetRows, statusPayload] = await Promise.all([ + apiFetch("/targets", {}, tokens, refresh), + apiFetch("/alerts/status", {}, tokens, refresh), + ]); + setTargets(targetRows); + setStatus(statusPayload); + + if (canManageAlerts) { + const defs = await apiFetch("/alerts/definitions", {}, tokens, refresh); + setDefinitions(defs); + } + } catch (e) { + setError(String(e.message || e)); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + loadAll(); + }, [canManageAlerts]); + + useEffect(() => { + const timer = setInterval(() => { + apiFetch("/alerts/status", {}, tokens, refresh) + .then(setStatus) + .catch(() => {}); + }, 20000); + return () => clearInterval(timer); + }, [tokens, refresh]); + + const targetOptions = useMemo( + () => [{ id: "", name: "All targets" }, ...targets.map((t) => ({ id: String(t.id), name: `${t.name} (${t.host}:${t.port})` }))], + [targets] + ); + + const createDefinition = async (e) => { + e.preventDefault(); + setSaving(true); + setTestResult(""); + try { + await apiFetch( + "/alerts/definitions", + { + method: "POST", + body: JSON.stringify({ + name: form.name, + description: form.description || null, + target_id: form.target_id ? Number(form.target_id) : null, + sql_text: form.sql_text, + comparison: form.comparison, + warning_threshold: form.warning_threshold === "" ? null : Number(form.warning_threshold), + alert_threshold: Number(form.alert_threshold), + enabled: !!form.enabled, + }), + }, + tokens, + refresh + ); + setForm(initialForm); + await loadAll(); + } catch (e) { + setError(String(e.message || e)); + } finally { + setSaving(false); + } + }; + + const testDefinition = async () => { + if (!form.target_id) { + setTestResult("Select a specific target to test this SQL query."); + return; + } + setTesting(true); + setTestResult(""); + try { + const res = await apiFetch( + "/alerts/definitions/test", + { + method: "POST", + body: JSON.stringify({ + target_id: Number(form.target_id), + sql_text: form.sql_text, + }), + }, + tokens, + refresh + ); + if (res.ok) { + setTestResult(`Query test succeeded. Returned value: ${formatAlertValue(res.value)}`); + } else { + setTestResult(`Query test failed: ${res.error}`); + } + } catch (e) { + setTestResult(String(e.message || e)); + } finally { + setTesting(false); + } + }; + + const removeDefinition = async (definitionId) => { + if (!confirm("Delete this custom alert definition?")) return; + try { + await apiFetch(`/alerts/definitions/${definitionId}`, { method: "DELETE" }, tokens, refresh); + await loadAll(); + } catch (e) { + setError(String(e.message || e)); + } + }; + + const toggleDefinition = async (definition) => { + try { + await apiFetch( + `/alerts/definitions/${definition.id}`, + { method: "PUT", body: JSON.stringify({ enabled: !definition.enabled }) }, + tokens, + refresh + ); + await loadAll(); + } catch (e) { + setError(String(e.message || e)); + } + }; + + if (loading) return Loading alerts...; + + return ( + + Alerts + Warnings are early signals. Alerts are critical thresholds reached or exceeded. + {error && {error}} + + + + {status.warning_count || 0} + Warnings + + + {status.alert_count || 0} + Alerts + + + + + + Warnings + {status.warnings?.length ? ( + + {status.warnings.map((item) => ( + + + Warning + {item.name} + {item.target_name} + + {item.description} + {item.message} + + ))} + + ) : ( + No warning-level alerts right now. + )} + + + + Alerts + {status.alerts?.length ? ( + + {status.alerts.map((item) => ( + + + Alert + {item.name} + {item.target_name} + + {item.description} + {item.message} + + ))} + + ) : ( + No critical alerts right now. + )} + + + + {canManageAlerts && ( + <> + + Create Custom Alert + Admins and operators can add SQL-based checks with warning and alert thresholds. + + + Name + setForm({ ...form, name: e.target.value })} placeholder="e.g. High Active Sessions" required /> + + + Target Scope + setForm({ ...form, target_id: e.target.value })}> + {targetOptions.map((opt) => ( + + {opt.name} + + ))} + + + + Comparison + setForm({ ...form, comparison: e.target.value })}> + greater than or equal (>=) + greater than (>) + less than or equal (<=) + less than (<) + + + + Description + setForm({ ...form, description: e.target.value })} placeholder="What does this check validate?" /> + + + Warning Threshold (optional) + setForm({ ...form, warning_threshold: e.target.value })} placeholder="e.g. 20" /> + + + Alert Threshold + setForm({ ...form, alert_threshold: e.target.value })} placeholder="e.g. 50" required /> + + + SQL Query (must return one numeric value) + setForm({ ...form, sql_text: e.target.value })} + placeholder="SELECT count(*)::float FROM pg_stat_activity WHERE state = 'active'" + required + /> + + + + {testing ? "Testing..." : "Test query output"} + + + {saving ? "Creating..." : "Create custom alert"} + + + {testResult && {testResult}} + + + + + Custom Alert Definitions + {definitions.length ? ( + + + + Name + Scope + Comparison + Warn + Alert + Status + Actions + + + + {definitions.map((d) => ( + + {d.name} + {d.target_id ? targets.find((t) => t.id === d.target_id)?.name || `Target #${d.target_id}` : "All targets"} + {d.comparison} + {d.warning_threshold ?? "-"} + {d.alert_threshold} + {d.enabled ? "Enabled" : "Disabled"} + + toggleDefinition(d)}> + {d.enabled ? "Disable" : "Enable"} + {" "} + removeDefinition(d.id)}> + Delete + + + + ))} + + + ) : ( + No custom alerts created yet. + )} + + > + )} + + ); +} diff --git a/frontend/src/styles.css b/frontend/src/styles.css index d2778be..cc4954a 100644 --- a/frontend/src/styles.css +++ b/frontend/src/styles.css @@ -225,6 +225,7 @@ a { input, select, +textarea, button { background: #0f2750; color: var(--text); @@ -730,6 +731,11 @@ details[open] .collapse-chevron { border-color: #be3f63; } +.small-btn { + min-height: 30px; + padding: 4px 10px; +} + button { cursor: pointer; } @@ -763,6 +769,121 @@ td { border-color: #7f1d1d; } +.muted { + color: #9eb8d6; +} + +.alerts-subtitle { + margin-top: 2px; + color: #a6c0df; +} + +.alerts-kpis .alerts-kpi { + display: grid; + gap: 2px; +} + +.alerts-kpi strong { + font-size: 32px; + line-height: 1; +} + +.alerts-kpi.warning { + border-color: #9a6426; + background: linear-gradient(180deg, #342713, #251b0f); +} + +.alerts-kpi.alert { + border-color: #a53a46; + background: linear-gradient(180deg, #381520, #2b1018); +} + +.alerts-list { + display: grid; + gap: 10px; + max-height: 520px; + overflow: auto; + padding-right: 3px; +} + +.alert-item { + border-radius: 12px; + border: 1px solid #375d8f; + background: #10294f; + padding: 12px; +} + +.alert-item.warning { + border-color: #8a6d34; + background: linear-gradient(180deg, #2f2516, #261f15); +} + +.alert-item.alert { + border-color: #9f3e4a; + background: linear-gradient(180deg, #361822, #2d131b); +} + +.alert-item-head { + display: flex; + align-items: center; + gap: 8px; + margin-bottom: 6px; +} + +.alert-item-head small { + color: #c3d5ef; + margin-left: auto; +} + +.alert-item p { + margin: 4px 0; + color: #d3e5fb; +} + +.alert-message { + font-size: 13px; + color: #b6cae8; +} + +.alert-badge { + display: inline-block; + border-radius: 999px; + padding: 3px 8px; + font-size: 11px; + font-weight: 700; + border: 1px solid transparent; +} + +.alert-badge.warning { + color: #f9d8a8; + background: #3a2c16; + border-color: #a1742f; +} + +.alert-badge.alert { + color: #fecaca; + background: #3a1620; + border-color: #ad4552; +} + +.alert-form .field-full { + grid-column: 1 / -1; +} + +.alert-form textarea { + width: 100%; + min-height: 90px; + resize: vertical; + font-family: "JetBrains Mono", "Consolas", monospace; + font-size: 13px; +} + +.alert-form-actions { + display: flex; + justify-content: flex-end; + gap: 8px; +} + .range-picker { display: flex; gap: 8px;
Warnings are early signals. Alerts are critical thresholds reached or exceeded.
{item.description}
{item.message}
No warning-level alerts right now.
No critical alerts right now.
Admins and operators can add SQL-based checks with warning and alert thresholds.
No custom alerts created yet.