Files
NexaPG/backend/app/services/collector.py
nessi 712bec3fea Add support for pg_stat_statements configuration in Targets
This commit introduces a `use_pg_stat_statements` flag for targets, allowing users to enable or disable the use of `pg_stat_statements` for query insights. It includes database schema changes, backend logic, and UI updates to manage this setting in both creation and editing workflows.
2026-02-12 13:39:57 +01:00

175 lines
6.4 KiB
Python

import asyncio
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from app.core.config import get_settings
from app.core.db import SessionLocal
from app.models.models import Metric, QueryStat, Target
from app.services.crypto import decrypt_secret
import asyncpg
logger = logging.getLogger(__name__)
settings = get_settings()
def build_target_dsn(target: Target) -> str:
password = decrypt_secret(target.encrypted_password)
return (
f"postgresql://{target.username}:{password}"
f"@{target.host}:{target.port}/{target.dbname}?sslmode={target.sslmode}"
)
async def _store_metric(db: AsyncSession, target_id: int, name: str, value: float, labels: dict | None = None) -> None:
db.add(
Metric(
target_id=target_id,
ts=datetime.now(timezone.utc),
metric_name=name,
value=float(value),
labels=labels or {},
)
)
async def collect_target(target: Target) -> None:
dsn = build_target_dsn(target)
conn = await asyncpg.connect(dsn=dsn)
try:
stat_db = await conn.fetchrow(
"""
SELECT
numbackends,
xact_commit,
xact_rollback,
deadlocks,
temp_files,
temp_bytes,
blk_read_time,
blk_write_time,
blks_hit,
blks_read,
tup_returned,
tup_fetched
FROM pg_stat_database
WHERE datname = current_database()
"""
)
activity = await conn.fetchrow(
"""
SELECT
count(*) FILTER (WHERE state = 'active') AS active_connections,
count(*) AS total_connections
FROM pg_stat_activity
WHERE datname = current_database()
"""
)
bgwriter = await conn.fetchrow(
"""
SELECT checkpoints_timed, checkpoints_req, buffers_checkpoint, buffers_clean, maxwritten_clean
FROM pg_stat_bgwriter
"""
)
if stat_db is None:
stat_db = {
"numbackends": 0,
"xact_commit": 0,
"xact_rollback": 0,
"deadlocks": 0,
"temp_files": 0,
"temp_bytes": 0,
"blk_read_time": 0,
"blk_write_time": 0,
"blks_hit": 0,
"blks_read": 0,
"tup_returned": 0,
"tup_fetched": 0,
}
if activity is None:
activity = {"active_connections": 0, "total_connections": 0}
if bgwriter is None:
bgwriter = {
"checkpoints_timed": 0,
"checkpoints_req": 0,
"buffers_checkpoint": 0,
"buffers_clean": 0,
"maxwritten_clean": 0,
}
lock_count = await conn.fetchval("SELECT count(*) FROM pg_locks")
cache_hit_ratio = 0.0
if stat_db and (stat_db["blks_hit"] + stat_db["blks_read"]) > 0:
cache_hit_ratio = stat_db["blks_hit"] / (stat_db["blks_hit"] + stat_db["blks_read"])
query_rows = []
if target.use_pg_stat_statements:
try:
query_rows = await conn.fetch(
"""
SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20
"""
)
except Exception:
# Extension may be disabled on monitored instance.
query_rows = []
async with SessionLocal() as db:
await _store_metric(db, target.id, "connections_total", activity["total_connections"], {})
await _store_metric(db, target.id, "connections_active", activity["active_connections"], {})
await _store_metric(db, target.id, "xacts_total", stat_db["xact_commit"] + stat_db["xact_rollback"], {})
await _store_metric(db, target.id, "xact_commit", stat_db["xact_commit"], {})
await _store_metric(db, target.id, "xact_rollback", stat_db["xact_rollback"], {})
await _store_metric(db, target.id, "deadlocks", stat_db["deadlocks"], {})
await _store_metric(db, target.id, "temp_files", stat_db["temp_files"], {})
await _store_metric(db, target.id, "temp_bytes", stat_db["temp_bytes"], {})
await _store_metric(db, target.id, "blk_read_time", stat_db["blk_read_time"], {})
await _store_metric(db, target.id, "blk_write_time", stat_db["blk_write_time"], {})
await _store_metric(db, target.id, "cache_hit_ratio", cache_hit_ratio, {})
await _store_metric(db, target.id, "locks_total", lock_count, {})
await _store_metric(db, target.id, "checkpoints_timed", bgwriter["checkpoints_timed"], {})
await _store_metric(db, target.id, "checkpoints_req", bgwriter["checkpoints_req"], {})
for row in query_rows:
db.add(
QueryStat(
target_id=target.id,
ts=datetime.now(timezone.utc),
queryid=row["queryid"] or "0",
calls=row["calls"] or 0,
total_time=row["total_exec_time"] or 0.0,
mean_time=row["mean_exec_time"] or 0.0,
rows=row["rows"] or 0,
query_text=row["query_text"],
)
)
await db.commit()
finally:
await conn.close()
async def collect_once() -> None:
async with SessionLocal() as db:
targets = (await db.scalars(select(Target))).all()
for target in targets:
try:
await collect_target(target)
except (OSError, SQLAlchemyError, asyncpg.PostgresError, Exception) as exc:
logger.exception("collector_error target=%s err=%s", target.id, exc)
async def collector_loop(stop_event: asyncio.Event) -> None:
while not stop_event.is_set():
await collect_once()
try:
await asyncio.wait_for(stop_event.wait(), timeout=settings.poll_interval_seconds)
except asyncio.TimeoutError:
pass