Files
NexaPG/backend/app/services/overview_collector.py
nessi 528a720329 Add support for new checkpointer statistics for PostgreSQL.
Introduced logic to check the existence of `pg_stat_checkpointer` and fetch corresponding statistics when available. This ensures compatibility with newer PostgreSQL versions while maintaining backward support using `pg_stat_bgwriter`.
2026-02-12 14:15:14 +01:00

238 lines
8.3 KiB
Python

from datetime import datetime, timezone
import asyncpg
from app.schemas.overview import (
DatabaseOverviewOut,
DbSizeItem,
DiskSpaceInfo,
InstanceOverview,
PerformanceOverview,
ReplicationClientItem,
ReplicationOverview,
StorageOverview,
TableSizeItem,
)
from app.services.infra_probe import DiskSpaceProvider
def _interval_seconds(value) -> float | None:
if value is None:
return None
try:
return float(value.total_seconds())
except Exception:
return None
async def _safe_fetchval(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
try:
return await conn.fetchval(query)
except Exception as exc:
errors.append(f"{label}: {exc}")
return None
async def _safe_fetchrow(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
try:
return await conn.fetchrow(query)
except Exception as exc:
errors.append(f"{label}: {exc}")
return None
async def _safe_fetch(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
try:
return await conn.fetch(query)
except Exception as exc:
errors.append(f"{label}: {exc}")
return []
async def collect_overview(
conn: asyncpg.Connection, disk_provider: DiskSpaceProvider, target_host: str, cache_ttl_seconds: int
) -> DatabaseOverviewOut:
errors: list[str] = []
version = await _safe_fetchval(conn, "SHOW server_version", errors, "show_server_version")
in_recovery = await _safe_fetchval(conn, "SELECT pg_is_in_recovery()", errors, "pg_is_in_recovery")
postmaster_start = await _safe_fetchval(conn, "SELECT pg_postmaster_start_time()", errors, "postmaster_start")
current_db = await _safe_fetchval(conn, "SELECT current_database()", errors, "current_database")
port = await _safe_fetchval(conn, "SELECT inet_server_port()", errors, "inet_server_port")
uptime_seconds = None
if postmaster_start:
uptime_seconds = (datetime.now(timezone.utc) - postmaster_start).total_seconds()
db_size = await _safe_fetchval(
conn, "SELECT pg_database_size(current_database())", errors, "pg_database_size_current"
)
all_db_rows = await _safe_fetch(
conn,
"""
SELECT datname, pg_database_size(datname) AS size_bytes
FROM pg_database
ORDER BY size_bytes DESC
""",
errors,
"pg_database_size_all",
)
largest_tables_rows = await _safe_fetch(
conn,
"""
SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes
FROM pg_stat_user_tables
ORDER BY size_bytes DESC
LIMIT 5
""",
errors,
"largest_tables",
)
wal_dir_size = await _safe_fetchval(
conn,
"""
SELECT COALESCE(sum(size), 0)
FROM pg_ls_waldir()
""",
errors,
"wal_directory_size",
)
stats_db = await _safe_fetchrow(
conn,
"""
SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time
FROM pg_stat_database
WHERE datname = current_database()
""",
errors,
"pg_stat_database_perf",
)
checkpointer_view_exists = await _safe_fetchval(
conn,
"SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL",
errors,
"checkpointer_view_exists",
)
if checkpointer_view_exists:
bgwriter = await _safe_fetchrow(
conn,
"""
SELECT
num_timed AS checkpoints_timed,
num_requested AS checkpoints_req
FROM pg_stat_checkpointer
""",
errors,
"pg_stat_checkpointer",
)
else:
bgwriter = await _safe_fetchrow(
conn,
"SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter",
errors,
"pg_stat_bgwriter",
)
autovac_workers = await _safe_fetchval(
conn,
"""
SELECT count(*)
FROM pg_stat_activity
WHERE query ILIKE 'autovacuum:%'
""",
errors,
"autovacuum_activity",
)
clients: list[ReplicationClientItem] = []
replay_lag_seconds = None
slots_count = None
active_clients = None
role = "standby" if in_recovery else "primary"
if in_recovery:
replay_lag_seconds = await _safe_fetchval(
conn,
"""
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
""",
errors,
"standby_replay_lag",
)
else:
slots_count = await _safe_fetchval(conn, "SELECT count(*) FROM pg_replication_slots", errors, "replication_slots")
repl_rows = await _safe_fetch(
conn,
"""
SELECT application_name, client_addr::text, state, sync_state,
EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds,
EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds,
EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds,
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes
FROM pg_stat_replication
""",
errors,
"pg_stat_replication",
)
active_clients = len(repl_rows)
clients = [
ReplicationClientItem(
application_name=row.get("application_name"),
client_addr=row.get("client_addr"),
state=row.get("state"),
sync_state=row.get("sync_state"),
write_lag_seconds=row.get("write_lag_seconds"),
flush_lag_seconds=row.get("flush_lag_seconds"),
replay_lag_seconds=row.get("replay_lag_seconds"),
replay_lag_bytes=float(row.get("replay_lag_bytes")) if row.get("replay_lag_bytes") is not None else None,
)
for row in repl_rows
]
disk = await disk_provider.get_free_bytes(target_host)
return DatabaseOverviewOut(
collected_at=datetime.now(timezone.utc),
cache_ttl_seconds=cache_ttl_seconds,
instance=InstanceOverview(
server_version=version,
role=role,
postmaster_start_time=postmaster_start,
uptime_seconds=uptime_seconds,
current_database=current_db,
port=int(port) if port is not None else None,
),
storage=StorageOverview(
current_database_size_bytes=int(db_size) if db_size is not None else None,
all_databases=[DbSizeItem(name=r["datname"], size_bytes=int(r["size_bytes"])) for r in all_db_rows],
largest_tables=[
TableSizeItem(schema=r["schemaname"], table=r["relname"], size_bytes=int(r["size_bytes"]))
for r in largest_tables_rows
],
wal_directory_size_bytes=int(wal_dir_size) if wal_dir_size is not None else None,
disk_space=DiskSpaceInfo(
source=disk.source,
status=disk.status,
free_bytes=disk.free_bytes,
message=disk.message,
),
),
replication=ReplicationOverview(
mode=role,
replay_lag_seconds=float(replay_lag_seconds) if replay_lag_seconds is not None else None,
replication_slots_count=int(slots_count) if slots_count is not None else None,
active_replication_clients=active_clients,
clients=clients,
),
performance=PerformanceOverview(
xact_commit=stats_db["xact_commit"] if stats_db else None,
xact_rollback=stats_db["xact_rollback"] if stats_db else None,
deadlocks=stats_db["deadlocks"] if stats_db else None,
temp_files=stats_db["temp_files"] if stats_db else None,
temp_bytes=stats_db["temp_bytes"] if stats_db else None,
blk_read_time=stats_db["blk_read_time"] if stats_db else None,
blk_write_time=stats_db["blk_write_time"] if stats_db else None,
checkpoints_timed=bgwriter["checkpoints_timed"] if bgwriter else None,
checkpoints_req=bgwriter["checkpoints_req"] if bgwriter else None,
autovacuum_workers=int(autovac_workers) if autovac_workers is not None else None,
),
partial_failures=errors,
)