import asyncio import os import sys from typing import Any import asyncpg def _required_env(name: str) -> str: value = os.getenv(name, "").strip() if not value: raise RuntimeError(f"Missing required env var: {name}") return value async def _connect_with_retry(dsn: str, attempts: int = 40, delay_seconds: float = 1.5) -> asyncpg.Connection: last_exc: Exception | None = None for _ in range(attempts): try: return await asyncpg.connect(dsn=dsn, timeout=5) except Exception as exc: # pragma: no cover - smoke utility last_exc = exc await asyncio.sleep(delay_seconds) raise RuntimeError(f"Could not connect to PostgreSQL after retries: {last_exc}") async def _fetchrow_required(conn: asyncpg.Connection, query: str, label: str) -> dict[str, Any]: row = await conn.fetchrow(query) if row is None: raise RuntimeError(f"{label} returned no rows") return dict(row) async def _run_required_fetchrow(conn: asyncpg.Connection, label: str, query: str) -> None: await _fetchrow_required(conn, query, label) print(f"[compat] PASS required: {label}") async def _run_required_fetch(conn: asyncpg.Connection, label: str, query: str) -> None: await conn.fetch(query) print(f"[compat] PASS required: {label}") async def _run_required_fetchval(conn: asyncpg.Connection, label: str, query: str) -> None: await conn.fetchval(query) print(f"[compat] PASS required: {label}") async def _run_optional(conn: asyncpg.Connection, label: str, query: str) -> None: try: await conn.fetch(query) print(f"[compat] PASS optional: {label}") except Exception as exc: print(f"[compat] SKIP optional: {label} ({exc})") async def _run_expect_failure( conn: asyncpg.Connection, label: str, query: str, accepted_sqlstates: set[str], ) -> None: try: await conn.fetch(query) except asyncpg.PostgresError as exc: if exc.sqlstate in accepted_sqlstates: print(f"[compat] PASS expected-failure: {label} (sqlstate={exc.sqlstate})") return raise RuntimeError(f"{label} failed with unexpected sqlstate={exc.sqlstate}: {exc}") from exc except Exception as exc: raise RuntimeError(f"{label} failed with unexpected non-Postgres error: {exc}") from exc raise RuntimeError(f"{label} unexpectedly succeeded, but failure was expected") def _section(title: str) -> None: print(f"[compat] --- {title} ---") def _dsn_candidates() -> list[str]: # Preferred: explicit candidate list for CI portability (Gitea/GitHub runners). raw_candidates = os.getenv("PG_DSN_CANDIDATES", "").strip() if raw_candidates: values = [item.strip() for item in raw_candidates.split(",") if item.strip()] if values: return values # Backward compatible single DSN. raw_single = os.getenv("PG_DSN", "").strip() if raw_single: return [raw_single] raise RuntimeError("Missing PG_DSN or PG_DSN_CANDIDATES") async def run() -> None: candidates = _dsn_candidates() last_error: Exception | None = None conn: asyncpg.Connection | None = None used_dsn = "" for dsn in candidates: try: conn = await _connect_with_retry(dsn) used_dsn = dsn break except Exception as exc: last_error = exc if conn is None: raise RuntimeError(f"Could not connect to PostgreSQL using candidates: {last_error}") try: version = await conn.fetchval("SHOW server_version") current_db = await conn.fetchval("SELECT current_database()") print(f"[compat] Connected: version={version} db={current_db} dsn={used_dsn}") _section("connectivity") await _run_required_fetchval(conn, "target_connection.select_1", "SELECT 1") await _run_required_fetchval(conn, "connectivity.server_encoding", "SHOW server_encoding") await _run_required_fetchval(conn, "connectivity.timezone", "SHOW TimeZone") _section("collector") # Core collector queries used in app/services/collector.py await _run_required_fetchrow( conn, "collector.pg_stat_database", """ SELECT numbackends, xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time, blks_hit, blks_read, tup_returned, tup_fetched FROM pg_stat_database WHERE datname = current_database() """, ) await _run_required_fetchrow( conn, "collector.pg_stat_activity", """ SELECT count(*) FILTER (WHERE state = 'active') AS active_connections, count(*) AS total_connections FROM pg_stat_activity WHERE datname = current_database() """, ) await _run_required_fetchval(conn, "collector.pg_locks_count", "SELECT count(*) FROM pg_locks") # Checkpoint stats fallback (PG14/15 vs newer changes) has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL") if has_checkpointer: await _run_required_fetchrow( conn, "collector.checkpointer_view", """ SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req, 0::bigint AS buffers_checkpoint, 0::bigint AS buffers_clean, 0::bigint AS maxwritten_clean FROM pg_stat_checkpointer """, ) print("[compat] Using pg_stat_checkpointer") else: await _run_required_fetchrow( conn, "collector.bgwriter_view", "SELECT checkpoints_timed, checkpoints_req, buffers_checkpoint, buffers_clean, maxwritten_clean FROM pg_stat_bgwriter", ) print("[compat] Using pg_stat_bgwriter") _section("target endpoints") # Target endpoint queries used in app/api/routes/targets.py await _run_required_fetch( conn, "target_endpoint.locks_table", """ SELECT locktype, mode, granted, relation::regclass::text AS relation, pid FROM pg_locks ORDER BY granted ASC, mode LIMIT 500 """, ) await _run_required_fetch( conn, "target_endpoint.activity_table", """ SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query FROM pg_stat_activity WHERE datname = current_database() ORDER BY query_start NULLS LAST LIMIT 200 """, ) await _run_required_fetch( conn, "target_endpoint.discover_databases", """ SELECT datname FROM pg_database WHERE datallowconn AND NOT datistemplate ORDER BY datname """, ) _section("overview") # Overview queries used in app/services/overview_collector.py await _run_required_fetchval(conn, "overview.show_server_version", "SHOW server_version") await _run_required_fetchval(conn, "overview.pg_is_in_recovery", "SELECT pg_is_in_recovery()") await _run_required_fetchval(conn, "overview.pg_postmaster_start_time", "SELECT pg_postmaster_start_time()") await _run_required_fetchval(conn, "overview.current_database", "SELECT current_database()") await _run_required_fetchval(conn, "overview.inet_server_port", "SELECT inet_server_port()") await _run_required_fetchval(conn, "overview.pg_database_size_current", "SELECT pg_database_size(current_database())") await _run_required_fetch( conn, "overview.pg_database_size_all", "SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC", ) await _run_required_fetch( conn, "overview.largest_tables", """ SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes FROM pg_stat_user_tables ORDER BY size_bytes DESC LIMIT 5 """, ) await _run_required_fetchval(conn, "overview.pg_ls_waldir", "SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()") await _run_required_fetchrow( conn, "overview.performance_pg_stat_database", """ SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time FROM pg_stat_database WHERE datname = current_database() """, ) await _run_required_fetchval( conn, "overview.autovacuum_activity", """ SELECT count(*) FROM pg_stat_activity WHERE query ILIKE 'autovacuum:%' """, ) await _run_required_fetchval( conn, "overview.checkpointer_view_exists", "SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL", ) if has_checkpointer: await _run_required_fetchrow( conn, "overview.pg_stat_checkpointer", "SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer", ) else: await _run_required_fetchrow( conn, "overview.pg_stat_bgwriter", "SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter", ) _section("optional paths") # Optional paths that may depend on role/extension config. await _run_optional(conn, "overview.replication_slots", "SELECT count(*) FROM pg_replication_slots") await _run_optional( conn, "overview.pg_stat_replication", """ SELECT application_name, client_addr::text, state, sync_state, EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds, EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds, EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds, pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes FROM pg_stat_replication """, ) await _run_optional( conn, "overview.standby_replay_lag", "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))", ) await _run_optional( conn, "collector.pg_stat_statements", """ SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text FROM pg_stat_statements ORDER BY total_exec_time DESC LIMIT 20 """, ) _section("pg_stat_statements modes") # Validate both runtime modes NexaPG must support: # 1) extension unavailable/not preloaded -> query fails with known sqlstate # 2) extension available + loaded -> query succeeds await conn.execute("DROP EXTENSION IF EXISTS pg_stat_statements") await _run_expect_failure( conn, "pg_stat_statements.absent_or_not_loaded", """ SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text FROM pg_stat_statements ORDER BY total_exec_time DESC LIMIT 20 """, accepted_sqlstates={"42P01", "55000"}, ) available = await conn.fetchval( """ SELECT EXISTS( SELECT 1 FROM pg_available_extensions WHERE name = 'pg_stat_statements' ) """ ) if available: try: await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_stat_statements") except Exception as exc: print(f"[compat] SKIP optional: pg_stat_statements.create_extension ({exc})") else: try: await conn.fetch( """ SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text FROM pg_stat_statements ORDER BY total_exec_time DESC LIMIT 20 """ ) print("[compat] PASS optional: pg_stat_statements.enabled_query") except asyncpg.PostgresError as exc: # Typical when shared_preload_libraries does not include pg_stat_statements. if exc.sqlstate == "55000": print(f"[compat] SKIP optional: pg_stat_statements.enabled_query ({exc})") else: raise RuntimeError(f"pg_stat_statements.enabled_query unexpected sqlstate={exc.sqlstate}: {exc}") from exc else: print("[compat] SKIP optional: pg_stat_statements.extension_unavailable") print("[compat] Smoke checks passed") finally: await conn.close() if __name__ == "__main__": try: asyncio.run(run()) except Exception as exc: # pragma: no cover - smoke utility print(f"[compat] FAILED: {exc}", file=sys.stderr) raise