Some checks failed
PostgreSQL Compatibility Matrix / PG14 smoke (push) Failing after 3m17s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Failing after 1m17s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Has been cancelled
PostgreSQL Compatibility Matrix / PG18 smoke (push) Has been cancelled
PostgreSQL Compatibility Matrix / PG16 smoke (push) Has been cancelled
Introduced a new helper function `_rollback_ratio_recent` to calculate the rollback ratio over the last 15 minutes, ensuring meaningful evaluation only when a minimum transaction threshold is met. Adjusted warning and alert thresholds for rollback ratio and added a contextual metric for transaction volume in the past 15 minutes.
480 lines
19 KiB
Python
480 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
import re
|
|
import time
|
|
|
|
import asyncpg
|
|
from fastapi import HTTPException
|
|
from sqlalchemy import desc, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import get_settings
|
|
from app.models.models import AlertDefinition, Metric, QueryStat, Target
|
|
from app.schemas.alert import AlertStatusItem, AlertStatusResponse
|
|
from app.services.collector import build_target_dsn
|
|
|
|
settings = get_settings()
|
|
_ALLOWED_COMPARISONS = {"gte", "gt", "lte", "lt"}
|
|
_FORBIDDEN_SQL_WORDS = re.compile(
|
|
r"\b(insert|update|delete|alter|drop|truncate|create|grant|revoke|vacuum|analyze|copy|call|do)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_STATUS_CACHE_TTL_SECONDS = 15
|
|
_status_cache: dict = {"expires": 0.0, "data": None}
|
|
|
|
|
|
@dataclass
|
|
class _RuleInput:
|
|
key: str
|
|
name: str
|
|
description: str
|
|
category: str
|
|
value: float | None
|
|
warning_threshold: float | None
|
|
alert_threshold: float | None
|
|
comparison: str = "gte"
|
|
|
|
|
|
def invalidate_alert_cache() -> None:
|
|
_status_cache["expires"] = 0.0
|
|
_status_cache["data"] = None
|
|
|
|
|
|
def validate_alert_thresholds(comparison: str, warning_threshold: float | None, alert_threshold: float) -> None:
|
|
if comparison not in _ALLOWED_COMPARISONS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid comparison. Use one of {sorted(_ALLOWED_COMPARISONS)}")
|
|
if warning_threshold is None:
|
|
return
|
|
|
|
if comparison in {"gte", "gt"} and warning_threshold > alert_threshold:
|
|
raise HTTPException(status_code=400, detail="For gte/gt, warning_threshold must be <= alert_threshold")
|
|
if comparison in {"lte", "lt"} and warning_threshold < alert_threshold:
|
|
raise HTTPException(status_code=400, detail="For lte/lt, warning_threshold must be >= alert_threshold")
|
|
|
|
|
|
def validate_alert_sql(sql_text: str) -> str:
|
|
sql = sql_text.strip().rstrip(";")
|
|
lowered = sql.lower().strip()
|
|
if not lowered.startswith("select"):
|
|
raise HTTPException(status_code=400, detail="Alert SQL must start with SELECT")
|
|
if _FORBIDDEN_SQL_WORDS.search(lowered):
|
|
raise HTTPException(status_code=400, detail="Only read-only SELECT statements are allowed")
|
|
if ";" in sql:
|
|
raise HTTPException(status_code=400, detail="Only a single SQL statement is allowed")
|
|
return sql
|
|
|
|
|
|
async def _connect_target(target: Target, timeout_seconds: int = 5) -> asyncpg.Connection:
|
|
return await asyncpg.connect(dsn=build_target_dsn(target), timeout=timeout_seconds)
|
|
|
|
|
|
async def run_scalar_sql_for_target(target: Target, sql_text: str) -> float:
|
|
sql = validate_alert_sql(sql_text)
|
|
conn: asyncpg.Connection | None = None
|
|
try:
|
|
conn = await _connect_target(target, timeout_seconds=6)
|
|
row = await conn.fetchrow(sql)
|
|
if not row:
|
|
raise ValueError("Query returned no rows")
|
|
value = row[0]
|
|
return float(value)
|
|
finally:
|
|
if conn:
|
|
await conn.close()
|
|
|
|
|
|
def _compare(value: float, threshold: float, comparison: str) -> bool:
|
|
if comparison == "gte":
|
|
return value >= threshold
|
|
if comparison == "gt":
|
|
return value > threshold
|
|
if comparison == "lte":
|
|
return value <= threshold
|
|
if comparison == "lt":
|
|
return value < threshold
|
|
return False
|
|
|
|
|
|
def _severity_from_thresholds(
|
|
value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None
|
|
) -> str:
|
|
if value is None or alert_threshold is None:
|
|
return "unknown"
|
|
if _compare(value, alert_threshold, comparison):
|
|
return "alert"
|
|
if warning_threshold is not None and _compare(value, warning_threshold, comparison):
|
|
return "warning"
|
|
return "ok"
|
|
|
|
|
|
def _status_message(value: float | None, comparison: str, warning_threshold: float | None, alert_threshold: float | None) -> str:
|
|
if value is None:
|
|
return "No numeric value available"
|
|
if alert_threshold is None:
|
|
return f"Current value: {value:.2f}"
|
|
if warning_threshold is None:
|
|
return f"Current value: {value:.2f} (alert when value {comparison} {alert_threshold:.2f})"
|
|
return (
|
|
f"Current value: {value:.2f} "
|
|
f"(warning when value {comparison} {warning_threshold:.2f}, alert when value {comparison} {alert_threshold:.2f})"
|
|
)
|
|
|
|
|
|
async def _latest_metric_value(db: AsyncSession, target_id: int, metric_name: str) -> float | None:
|
|
row = await db.scalar(
|
|
select(Metric.value)
|
|
.where(Metric.target_id == target_id, Metric.metric_name == metric_name)
|
|
.order_by(desc(Metric.ts))
|
|
.limit(1)
|
|
)
|
|
return float(row) if row is not None else None
|
|
|
|
|
|
async def _metric_delta(db: AsyncSession, target_id: int, metric_name: str, minutes: int) -> float | None:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
|
|
latest = await db.scalar(
|
|
select(Metric.value)
|
|
.where(Metric.target_id == target_id, Metric.metric_name == metric_name)
|
|
.order_by(desc(Metric.ts))
|
|
.limit(1)
|
|
)
|
|
oldest = await db.scalar(
|
|
select(Metric.value)
|
|
.where(Metric.target_id == target_id, Metric.metric_name == metric_name, Metric.ts >= cutoff)
|
|
.order_by(Metric.ts.asc())
|
|
.limit(1)
|
|
)
|
|
if latest is None or oldest is None:
|
|
return None
|
|
return max(0.0, float(latest) - float(oldest))
|
|
|
|
|
|
async def _rollback_ratio_recent(
|
|
db: AsyncSession, target_id: int, minutes: int, min_total_transactions: int
|
|
) -> tuple[float | None, float]:
|
|
commit_delta = await _metric_delta(db, target_id, "xact_commit", minutes=minutes)
|
|
rollback_delta = await _metric_delta(db, target_id, "xact_rollback", minutes=minutes)
|
|
if commit_delta is None or rollback_delta is None:
|
|
return None, 0.0
|
|
tx_total = commit_delta + rollback_delta
|
|
if tx_total < float(min_total_transactions):
|
|
# Too little traffic in window, ratio would be noisy and misleading.
|
|
return None, tx_total
|
|
return (rollback_delta / tx_total) if tx_total > 0 else None, tx_total
|
|
|
|
|
|
async def _latest_query_snapshot_max(db: AsyncSession, target_id: int, column_name: str) -> float | None:
|
|
latest_ts = await db.scalar(select(func.max(QueryStat.ts)).where(QueryStat.target_id == target_id))
|
|
if latest_ts is None:
|
|
return None
|
|
column = QueryStat.mean_time if column_name == "mean_time" else QueryStat.total_time
|
|
value = await db.scalar(
|
|
select(func.max(column)).where(QueryStat.target_id == target_id, QueryStat.ts == latest_ts)
|
|
)
|
|
return float(value) if value is not None else None
|
|
|
|
|
|
async def _build_standard_rules(db: AsyncSession, target: Target) -> tuple[list[_RuleInput], list[AlertStatusItem]]:
|
|
rules: list[_RuleInput] = []
|
|
forced_items: list[AlertStatusItem] = []
|
|
checked_at = datetime.now(timezone.utc)
|
|
|
|
# 1) Connectivity with RTT threshold.
|
|
start = time.perf_counter()
|
|
conn: asyncpg.Connection | None = None
|
|
try:
|
|
conn = await _connect_target(target, timeout_seconds=4)
|
|
await conn.fetchval("SELECT 1")
|
|
connect_ms = (time.perf_counter() - start) * 1000
|
|
rules.append(
|
|
_RuleInput(
|
|
key="connectivity_rtt_ms",
|
|
name="Connectivity Latency",
|
|
description="Checks whether the target is reachable and how long the connection handshake takes.",
|
|
category="availability",
|
|
value=connect_ms,
|
|
warning_threshold=1000,
|
|
alert_threshold=2500,
|
|
comparison="gte",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
forced_items.append(
|
|
AlertStatusItem(
|
|
alert_key=f"std-connectivity-down-{target.id}",
|
|
source="standard",
|
|
severity="alert",
|
|
category="availability",
|
|
name="Target Reachability",
|
|
description="Verifies that the monitored database can be reached by the collector.",
|
|
target_id=target.id,
|
|
target_name=target.name,
|
|
value=None,
|
|
warning_threshold=None,
|
|
alert_threshold=None,
|
|
comparison="gte",
|
|
message=f"Connection failed: {exc}",
|
|
checked_at=checked_at,
|
|
)
|
|
)
|
|
finally:
|
|
if conn:
|
|
await conn.close()
|
|
|
|
# 2) Collector freshness based on latest stored metric.
|
|
latest_ts = await db.scalar(select(func.max(Metric.ts)).where(Metric.target_id == target.id))
|
|
if latest_ts is None:
|
|
forced_items.append(
|
|
AlertStatusItem(
|
|
alert_key=f"std-metric-freshness-{target.id}",
|
|
source="standard",
|
|
severity="warning",
|
|
category="availability",
|
|
name="Collector Freshness",
|
|
description="Ensures fresh metrics are arriving for the target.",
|
|
target_id=target.id,
|
|
target_name=target.name,
|
|
value=None,
|
|
warning_threshold=None,
|
|
alert_threshold=None,
|
|
comparison="gte",
|
|
message="No metrics collected yet for this target.",
|
|
checked_at=checked_at,
|
|
)
|
|
)
|
|
else:
|
|
age_seconds = max(0.0, (checked_at - latest_ts).total_seconds())
|
|
rules.append(
|
|
_RuleInput(
|
|
key="collector_freshness_seconds",
|
|
name="Collector Freshness",
|
|
description="Age of the most recent metric sample.",
|
|
category="availability",
|
|
value=age_seconds,
|
|
warning_threshold=float(settings.poll_interval_seconds * 2),
|
|
alert_threshold=float(settings.poll_interval_seconds * 4),
|
|
)
|
|
)
|
|
|
|
active_connections = await _latest_metric_value(db, target.id, "connections_active")
|
|
total_connections = await _latest_metric_value(db, target.id, "connections_total")
|
|
cache_hit_ratio = await _latest_metric_value(db, target.id, "cache_hit_ratio")
|
|
lock_count = await _latest_metric_value(db, target.id, "locks_total")
|
|
checkpoints_req_delta = await _metric_delta(db, target.id, "checkpoints_req", minutes=15)
|
|
deadlocks_delta = await _metric_delta(db, target.id, "deadlocks", minutes=60)
|
|
slowest_query_mean = await _latest_query_snapshot_max(db, target.id, "mean_time")
|
|
slowest_query_total = await _latest_query_snapshot_max(db, target.id, "total_time")
|
|
|
|
active_ratio = None
|
|
if active_connections is not None and total_connections and total_connections > 0:
|
|
active_ratio = active_connections / total_connections
|
|
|
|
rollback_ratio_15m, tx_total_15m = await _rollback_ratio_recent(
|
|
db, target.id, minutes=15, min_total_transactions=20
|
|
)
|
|
|
|
rules.extend(
|
|
[
|
|
_RuleInput(
|
|
key="active_connections_ratio",
|
|
name="Active Connection Ratio",
|
|
description="Share of active sessions over total sessions.",
|
|
category="capacity",
|
|
value=active_ratio,
|
|
warning_threshold=0.70,
|
|
alert_threshold=0.90,
|
|
),
|
|
_RuleInput(
|
|
key="cache_hit_ratio_low",
|
|
name="Cache Hit Ratio",
|
|
description="Low cache hit ratio means increased disk reads and slower queries.",
|
|
category="performance",
|
|
value=cache_hit_ratio,
|
|
warning_threshold=0.95,
|
|
alert_threshold=0.90,
|
|
comparison="lte",
|
|
),
|
|
_RuleInput(
|
|
key="locks_total",
|
|
name="Lock Pressure",
|
|
description="Number of locks currently held on the target.",
|
|
category="contention",
|
|
value=lock_count,
|
|
warning_threshold=50,
|
|
alert_threshold=100,
|
|
),
|
|
_RuleInput(
|
|
key="checkpoints_req_15m",
|
|
name="Checkpoint Pressure (15m)",
|
|
description="Increase of requested checkpoints in the last 15 minutes.",
|
|
category="io",
|
|
value=checkpoints_req_delta,
|
|
warning_threshold=5,
|
|
alert_threshold=15,
|
|
),
|
|
_RuleInput(
|
|
key="rollback_ratio",
|
|
name="Rollback Ratio",
|
|
description="Fraction of rolled back transactions in the last 15 minutes (evaluated only when at least 20 transactions occurred).",
|
|
category="transactions",
|
|
value=rollback_ratio_15m,
|
|
warning_threshold=0.10,
|
|
alert_threshold=0.25,
|
|
),
|
|
_RuleInput(
|
|
key="deadlocks_60m",
|
|
name="Deadlocks (60m)",
|
|
description="Increase in deadlocks during the last hour.",
|
|
category="contention",
|
|
value=deadlocks_delta,
|
|
warning_threshold=1,
|
|
alert_threshold=5,
|
|
),
|
|
_RuleInput(
|
|
key="slowest_query_mean_ms",
|
|
name="Slowest Query Mean Time",
|
|
description="Highest mean execution time in the latest query snapshot.",
|
|
category="query",
|
|
value=slowest_query_mean,
|
|
warning_threshold=300,
|
|
alert_threshold=1000,
|
|
),
|
|
_RuleInput(
|
|
key="slowest_query_total_ms",
|
|
name="Slowest Query Total Time",
|
|
description="Highest total execution time in the latest query snapshot.",
|
|
category="query",
|
|
value=slowest_query_total,
|
|
warning_threshold=3000,
|
|
alert_threshold=10000,
|
|
),
|
|
]
|
|
)
|
|
# Expose transaction volume as contextual metric for UI/debugging.
|
|
rules.append(
|
|
_RuleInput(
|
|
key="rollback_tx_volume_15m",
|
|
name="Rollback Ratio Evaluation Volume",
|
|
description="Total transactions in the last 15 minutes used for rollback-ratio evaluation.",
|
|
category="transactions",
|
|
value=tx_total_15m,
|
|
warning_threshold=None,
|
|
alert_threshold=None,
|
|
)
|
|
)
|
|
return rules, forced_items
|
|
|
|
|
|
async def _evaluate_standard_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
items: list[AlertStatusItem] = []
|
|
|
|
for target in targets:
|
|
rules, forced_items = await _build_standard_rules(db, target)
|
|
items.extend(forced_items)
|
|
for rule in rules:
|
|
severity = _severity_from_thresholds(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold)
|
|
if severity not in {"warning", "alert"}:
|
|
continue
|
|
items.append(
|
|
AlertStatusItem(
|
|
alert_key=f"std-{rule.key}-{target.id}",
|
|
source="standard",
|
|
severity=severity,
|
|
category=rule.category,
|
|
name=rule.name,
|
|
description=rule.description,
|
|
target_id=target.id,
|
|
target_name=target.name,
|
|
value=rule.value,
|
|
warning_threshold=rule.warning_threshold,
|
|
alert_threshold=rule.alert_threshold,
|
|
comparison=rule.comparison,
|
|
message=_status_message(rule.value, rule.comparison, rule.warning_threshold, rule.alert_threshold),
|
|
checked_at=checked_at,
|
|
)
|
|
)
|
|
return items
|
|
|
|
|
|
async def _evaluate_custom_alerts(db: AsyncSession, targets: list[Target]) -> list[AlertStatusItem]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
defs = (
|
|
await db.scalars(select(AlertDefinition).where(AlertDefinition.enabled.is_(True)).order_by(desc(AlertDefinition.id)))
|
|
).all()
|
|
target_by_id = {t.id: t for t in targets}
|
|
items: list[AlertStatusItem] = []
|
|
|
|
for definition in defs:
|
|
target_candidates = targets if definition.target_id is None else [target_by_id.get(definition.target_id)]
|
|
for target in [t for t in target_candidates if t is not None]:
|
|
value: float | None = None
|
|
severity = "unknown"
|
|
message = "No data"
|
|
try:
|
|
value = await run_scalar_sql_for_target(target, definition.sql_text)
|
|
severity = _severity_from_thresholds(
|
|
value=value,
|
|
comparison=definition.comparison,
|
|
warning_threshold=definition.warning_threshold,
|
|
alert_threshold=definition.alert_threshold,
|
|
)
|
|
message = _status_message(value, definition.comparison, definition.warning_threshold, definition.alert_threshold)
|
|
except Exception as exc:
|
|
severity = "alert"
|
|
message = f"Execution failed: {exc}"
|
|
|
|
if severity not in {"warning", "alert"}:
|
|
continue
|
|
items.append(
|
|
AlertStatusItem(
|
|
alert_key=f"custom-{definition.id}-{target.id}",
|
|
source="custom",
|
|
severity=severity,
|
|
category="custom",
|
|
name=definition.name,
|
|
description=definition.description or "Custom SQL alert",
|
|
target_id=target.id,
|
|
target_name=target.name,
|
|
value=value,
|
|
warning_threshold=definition.warning_threshold,
|
|
alert_threshold=definition.alert_threshold,
|
|
comparison=definition.comparison,
|
|
message=message,
|
|
checked_at=checked_at,
|
|
sql_text=definition.sql_text,
|
|
)
|
|
)
|
|
return items
|
|
|
|
|
|
async def get_alert_status(db: AsyncSession, use_cache: bool = True) -> AlertStatusResponse:
|
|
now_seconds = time.time()
|
|
cached = _status_cache.get("data")
|
|
expires = float(_status_cache.get("expires", 0.0))
|
|
if use_cache and cached and expires > now_seconds:
|
|
return cached
|
|
|
|
targets = (await db.scalars(select(Target).order_by(Target.name.asc()))).all()
|
|
standard_items = await _evaluate_standard_alerts(db, targets)
|
|
custom_items = await _evaluate_custom_alerts(db, targets)
|
|
all_items = standard_items + custom_items
|
|
|
|
warnings = [item for item in all_items if item.severity == "warning"]
|
|
alerts = [item for item in all_items if item.severity == "alert"]
|
|
warnings.sort(key=lambda i: (i.target_name.lower(), i.name.lower()))
|
|
alerts.sort(key=lambda i: (i.target_name.lower(), i.name.lower()))
|
|
|
|
payload = AlertStatusResponse(
|
|
generated_at=datetime.now(timezone.utc),
|
|
warnings=warnings,
|
|
alerts=alerts,
|
|
warning_count=len(warnings),
|
|
alert_count=len(alerts),
|
|
)
|
|
_status_cache["data"] = payload
|
|
_status_cache["expires"] = now_seconds + _STATUS_CACHE_TTL_SECONDS
|
|
return payload
|