import asyncio import os import sys from typing import Any import asyncpg def _required_env(name: str) -> str: value = os.getenv(name, "").strip() if not value: raise RuntimeError(f"Missing required env var: {name}") return value async def _connect_with_retry(dsn: str, attempts: int = 40, delay_seconds: float = 1.5) -> asyncpg.Connection: last_exc: Exception | None = None for _ in range(attempts): try: return await asyncpg.connect(dsn=dsn, timeout=5) except Exception as exc: # pragma: no cover - smoke utility last_exc = exc await asyncio.sleep(delay_seconds) raise RuntimeError(f"Could not connect to PostgreSQL after retries: {last_exc}") async def _fetchrow_required(conn: asyncpg.Connection, query: str, label: str) -> dict[str, Any]: row = await conn.fetchrow(query) if row is None: raise RuntimeError(f"{label} returned no rows") return dict(row) async def run() -> None: dsn = _required_env("PG_DSN") conn = await _connect_with_retry(dsn) try: version = await conn.fetchval("SHOW server_version") current_db = await conn.fetchval("SELECT current_database()") print(f"[compat] Connected: version={version} db={current_db}") # Core collector queries await _fetchrow_required( conn, """ SELECT numbackends, xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time, blks_hit, blks_read, tup_returned, tup_fetched FROM pg_stat_database WHERE datname = current_database() """, "pg_stat_database", ) await _fetchrow_required( conn, """ SELECT count(*) FILTER (WHERE state = 'active') AS active_connections, count(*) AS total_connections FROM pg_stat_activity WHERE datname = current_database() """, "pg_stat_activity", ) await conn.fetchval("SELECT count(*) FROM pg_locks") # Checkpoint stats fallback (PG14/15 vs newer changes) has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL") if has_checkpointer: await _fetchrow_required( conn, "SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer", "pg_stat_checkpointer", ) print("[compat] Using pg_stat_checkpointer") else: await _fetchrow_required( conn, "SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter", "pg_stat_bgwriter", ) print("[compat] Using pg_stat_bgwriter") # Overview queries await conn.fetchval("SELECT pg_is_in_recovery()") await conn.fetchval("SELECT pg_postmaster_start_time()") await conn.fetchval("SELECT inet_server_port()") await conn.fetchval("SELECT pg_database_size(current_database())") await conn.fetch("SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC") await conn.fetch( """ SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes FROM pg_stat_user_tables ORDER BY size_bytes DESC LIMIT 5 """ ) await conn.fetchval("SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()") # Optional extension check should not fail if unavailable. try: await conn.fetch( """ SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text FROM pg_stat_statements ORDER BY total_exec_time DESC LIMIT 5 """ ) print("[compat] pg_stat_statements query succeeded") except Exception as exc: print(f"[compat] pg_stat_statements unavailable/disabled (expected in some setups): {exc}") print("[compat] Smoke checks passed") finally: await conn.close() if __name__ == "__main__": try: asyncio.run(run()) except Exception as exc: # pragma: no cover - smoke utility print(f"[compat] FAILED: {exc}", file=sys.stderr) raise