From 35a76aaca652245bb49f012dcb784404bcd1d227 Mon Sep 17 00:00:00 2001 From: nessi Date: Thu, 12 Feb 2026 14:43:06 +0100 Subject: [PATCH] Refactor PostgreSQL compatibility smoke test runner Refactored `_fetchrow_required` and introduced multiple helper functions (e.g., `_run_required_fetch`, `_run_optional`) to streamline query execution and improve readability. Organized the script into distinct sections for better maintainability: connectivity, collector, target endpoints, overview, and optional paths. This enhances clarity and ensures consistency in the smoke testing process. --- backend/scripts/pg_compat_smoke.py | 192 ++++++++++++++++++++++++----- 1 file changed, 158 insertions(+), 34 deletions(-) diff --git a/backend/scripts/pg_compat_smoke.py b/backend/scripts/pg_compat_smoke.py index 8d953d3..50df087 100644 --- a/backend/scripts/pg_compat_smoke.py +++ b/backend/scripts/pg_compat_smoke.py @@ -31,6 +31,33 @@ async def _fetchrow_required(conn: asyncpg.Connection, query: str, label: str) - return dict(row) +async def _run_required_fetchrow(conn: asyncpg.Connection, label: str, query: str) -> None: + await _fetchrow_required(conn, query, label) + print(f"[compat] PASS required: {label}") + + +async def _run_required_fetch(conn: asyncpg.Connection, label: str, query: str) -> None: + await conn.fetch(query) + print(f"[compat] PASS required: {label}") + + +async def _run_required_fetchval(conn: asyncpg.Connection, label: str, query: str) -> None: + await conn.fetchval(query) + print(f"[compat] PASS required: {label}") + + +async def _run_optional(conn: asyncpg.Connection, label: str, query: str) -> None: + try: + await conn.fetch(query) + print(f"[compat] PASS optional: {label}") + except Exception as exc: + print(f"[compat] SKIP optional: {label} ({exc})") + + +def _section(title: str) -> None: + print(f"[compat] --- {title} ---") + + def _dsn_candidates() -> list[str]: # Preferred: explicit candidate list for CI portability (Gitea/GitHub runners). raw_candidates = os.getenv("PG_DSN_CANDIDATES", "").strip() @@ -65,9 +92,14 @@ async def run() -> None: current_db = await conn.fetchval("SELECT current_database()") print(f"[compat] Connected: version={version} db={current_db} dsn={used_dsn}") - # Core collector queries - await _fetchrow_required( + _section("connectivity") + await _run_required_fetchval(conn, "target_connection.select_1", "SELECT 1") + + _section("collector") + # Core collector queries used in app/services/collector.py + await _run_required_fetchrow( conn, + "collector.pg_stat_database", """ SELECT numbackends, @@ -85,11 +117,11 @@ async def run() -> None: FROM pg_stat_database WHERE datname = current_database() """, - "pg_stat_database", ) - await _fetchrow_required( + await _run_required_fetchrow( conn, + "collector.pg_stat_activity", """ SELECT count(*) FILTER (WHERE state = 'active') AS active_connections, @@ -97,57 +129,149 @@ async def run() -> None: FROM pg_stat_activity WHERE datname = current_database() """, - "pg_stat_activity", ) - await conn.fetchval("SELECT count(*) FROM pg_locks") + await _run_required_fetchval(conn, "collector.pg_locks_count", "SELECT count(*) FROM pg_locks") # Checkpoint stats fallback (PG14/15 vs newer changes) has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL") if has_checkpointer: - await _fetchrow_required( + await _run_required_fetchrow( conn, - "SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer", - "pg_stat_checkpointer", + "collector.checkpointer_view", + """ + SELECT + num_timed AS checkpoints_timed, + num_requested AS checkpoints_req, + 0::bigint AS buffers_checkpoint, + 0::bigint AS buffers_clean, + 0::bigint AS maxwritten_clean + FROM pg_stat_checkpointer + """, ) print("[compat] Using pg_stat_checkpointer") else: - await _fetchrow_required( + await _run_required_fetchrow( conn, - "SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter", - "pg_stat_bgwriter", + "collector.bgwriter_view", + "SELECT checkpoints_timed, checkpoints_req, buffers_checkpoint, buffers_clean, maxwritten_clean FROM pg_stat_bgwriter", ) print("[compat] Using pg_stat_bgwriter") - # Overview queries - await conn.fetchval("SELECT pg_is_in_recovery()") - await conn.fetchval("SELECT pg_postmaster_start_time()") - await conn.fetchval("SELECT inet_server_port()") - await conn.fetchval("SELECT pg_database_size(current_database())") - await conn.fetch("SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC") - await conn.fetch( + _section("target endpoints") + # Target endpoint queries used in app/api/routes/targets.py + await _run_required_fetch( + conn, + "target_endpoint.locks_table", + """ + SELECT locktype, mode, granted, relation::regclass::text AS relation, pid + FROM pg_locks + ORDER BY granted ASC, mode + LIMIT 500 + """, + ) + await _run_required_fetch( + conn, + "target_endpoint.activity_table", + """ + SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query + FROM pg_stat_activity + WHERE datname = current_database() + ORDER BY query_start NULLS LAST + LIMIT 200 + """, + ) + + _section("overview") + # Overview queries used in app/services/overview_collector.py + await _run_required_fetchval(conn, "overview.show_server_version", "SHOW server_version") + await _run_required_fetchval(conn, "overview.pg_is_in_recovery", "SELECT pg_is_in_recovery()") + await _run_required_fetchval(conn, "overview.pg_postmaster_start_time", "SELECT pg_postmaster_start_time()") + await _run_required_fetchval(conn, "overview.current_database", "SELECT current_database()") + await _run_required_fetchval(conn, "overview.inet_server_port", "SELECT inet_server_port()") + await _run_required_fetchval(conn, "overview.pg_database_size_current", "SELECT pg_database_size(current_database())") + await _run_required_fetch( + conn, + "overview.pg_database_size_all", + "SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC", + ) + await _run_required_fetch( + conn, + "overview.largest_tables", """ SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes FROM pg_stat_user_tables ORDER BY size_bytes DESC LIMIT 5 - """ + """, ) - await conn.fetchval("SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()") - - # Optional extension check should not fail if unavailable. - try: - await conn.fetch( - """ - SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text - FROM pg_stat_statements - ORDER BY total_exec_time DESC - LIMIT 5 - """ + await _run_required_fetchval(conn, "overview.pg_ls_waldir", "SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()") + await _run_required_fetchrow( + conn, + "overview.performance_pg_stat_database", + """ + SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time + FROM pg_stat_database + WHERE datname = current_database() + """, + ) + await _run_required_fetchval( + conn, + "overview.autovacuum_activity", + """ + SELECT count(*) + FROM pg_stat_activity + WHERE query ILIKE 'autovacuum:%' + """, + ) + await _run_required_fetchval( + conn, + "overview.checkpointer_view_exists", + "SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL", + ) + if has_checkpointer: + await _run_required_fetchrow( + conn, + "overview.pg_stat_checkpointer", + "SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer", ) - print("[compat] pg_stat_statements query succeeded") - except Exception as exc: - print(f"[compat] pg_stat_statements unavailable/disabled (expected in some setups): {exc}") + else: + await _run_required_fetchrow( + conn, + "overview.pg_stat_bgwriter", + "SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter", + ) + + _section("optional paths") + # Optional paths that may depend on role/extension config. + await _run_optional(conn, "overview.replication_slots", "SELECT count(*) FROM pg_replication_slots") + await _run_optional( + conn, + "overview.pg_stat_replication", + """ + SELECT application_name, client_addr::text, state, sync_state, + EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds, + EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds, + EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds, + pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes + FROM pg_stat_replication + """, + ) + await _run_optional( + conn, + "overview.standby_replay_lag", + "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))", + ) + await _run_optional( + conn, + "collector.pg_stat_statements", + """ + SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text + FROM pg_stat_statements + ORDER BY total_exec_time DESC + LIMIT 20 + """, + ) print("[compat] Smoke checks passed") finally: