Refactor PostgreSQL compatibility smoke test runner
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 6s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 6s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
Refactored `_fetchrow_required` and introduced multiple helper functions (e.g., `_run_required_fetch`, `_run_optional`) to streamline query execution and improve readability. Organized the script into distinct sections for better maintainability: connectivity, collector, target endpoints, overview, and optional paths. This enhances clarity and ensures consistency in the smoke testing process.
This commit is contained in:
@@ -31,6 +31,33 @@ async def _fetchrow_required(conn: asyncpg.Connection, query: str, label: str) -
|
|||||||
return dict(row)
|
return dict(row)
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_required_fetchrow(conn: asyncpg.Connection, label: str, query: str) -> None:
|
||||||
|
await _fetchrow_required(conn, query, label)
|
||||||
|
print(f"[compat] PASS required: {label}")
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_required_fetch(conn: asyncpg.Connection, label: str, query: str) -> None:
|
||||||
|
await conn.fetch(query)
|
||||||
|
print(f"[compat] PASS required: {label}")
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_required_fetchval(conn: asyncpg.Connection, label: str, query: str) -> None:
|
||||||
|
await conn.fetchval(query)
|
||||||
|
print(f"[compat] PASS required: {label}")
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_optional(conn: asyncpg.Connection, label: str, query: str) -> None:
|
||||||
|
try:
|
||||||
|
await conn.fetch(query)
|
||||||
|
print(f"[compat] PASS optional: {label}")
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"[compat] SKIP optional: {label} ({exc})")
|
||||||
|
|
||||||
|
|
||||||
|
def _section(title: str) -> None:
|
||||||
|
print(f"[compat] --- {title} ---")
|
||||||
|
|
||||||
|
|
||||||
def _dsn_candidates() -> list[str]:
|
def _dsn_candidates() -> list[str]:
|
||||||
# Preferred: explicit candidate list for CI portability (Gitea/GitHub runners).
|
# Preferred: explicit candidate list for CI portability (Gitea/GitHub runners).
|
||||||
raw_candidates = os.getenv("PG_DSN_CANDIDATES", "").strip()
|
raw_candidates = os.getenv("PG_DSN_CANDIDATES", "").strip()
|
||||||
@@ -65,9 +92,14 @@ async def run() -> None:
|
|||||||
current_db = await conn.fetchval("SELECT current_database()")
|
current_db = await conn.fetchval("SELECT current_database()")
|
||||||
print(f"[compat] Connected: version={version} db={current_db} dsn={used_dsn}")
|
print(f"[compat] Connected: version={version} db={current_db} dsn={used_dsn}")
|
||||||
|
|
||||||
# Core collector queries
|
_section("connectivity")
|
||||||
await _fetchrow_required(
|
await _run_required_fetchval(conn, "target_connection.select_1", "SELECT 1")
|
||||||
|
|
||||||
|
_section("collector")
|
||||||
|
# Core collector queries used in app/services/collector.py
|
||||||
|
await _run_required_fetchrow(
|
||||||
conn,
|
conn,
|
||||||
|
"collector.pg_stat_database",
|
||||||
"""
|
"""
|
||||||
SELECT
|
SELECT
|
||||||
numbackends,
|
numbackends,
|
||||||
@@ -85,11 +117,11 @@ async def run() -> None:
|
|||||||
FROM pg_stat_database
|
FROM pg_stat_database
|
||||||
WHERE datname = current_database()
|
WHERE datname = current_database()
|
||||||
""",
|
""",
|
||||||
"pg_stat_database",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await _fetchrow_required(
|
await _run_required_fetchrow(
|
||||||
conn,
|
conn,
|
||||||
|
"collector.pg_stat_activity",
|
||||||
"""
|
"""
|
||||||
SELECT
|
SELECT
|
||||||
count(*) FILTER (WHERE state = 'active') AS active_connections,
|
count(*) FILTER (WHERE state = 'active') AS active_connections,
|
||||||
@@ -97,57 +129,149 @@ async def run() -> None:
|
|||||||
FROM pg_stat_activity
|
FROM pg_stat_activity
|
||||||
WHERE datname = current_database()
|
WHERE datname = current_database()
|
||||||
""",
|
""",
|
||||||
"pg_stat_activity",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await conn.fetchval("SELECT count(*) FROM pg_locks")
|
await _run_required_fetchval(conn, "collector.pg_locks_count", "SELECT count(*) FROM pg_locks")
|
||||||
|
|
||||||
# Checkpoint stats fallback (PG14/15 vs newer changes)
|
# Checkpoint stats fallback (PG14/15 vs newer changes)
|
||||||
has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL")
|
has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL")
|
||||||
if has_checkpointer:
|
if has_checkpointer:
|
||||||
await _fetchrow_required(
|
await _run_required_fetchrow(
|
||||||
conn,
|
conn,
|
||||||
"SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer",
|
"collector.checkpointer_view",
|
||||||
"pg_stat_checkpointer",
|
"""
|
||||||
|
SELECT
|
||||||
|
num_timed AS checkpoints_timed,
|
||||||
|
num_requested AS checkpoints_req,
|
||||||
|
0::bigint AS buffers_checkpoint,
|
||||||
|
0::bigint AS buffers_clean,
|
||||||
|
0::bigint AS maxwritten_clean
|
||||||
|
FROM pg_stat_checkpointer
|
||||||
|
""",
|
||||||
)
|
)
|
||||||
print("[compat] Using pg_stat_checkpointer")
|
print("[compat] Using pg_stat_checkpointer")
|
||||||
else:
|
else:
|
||||||
await _fetchrow_required(
|
await _run_required_fetchrow(
|
||||||
conn,
|
conn,
|
||||||
"SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter",
|
"collector.bgwriter_view",
|
||||||
"pg_stat_bgwriter",
|
"SELECT checkpoints_timed, checkpoints_req, buffers_checkpoint, buffers_clean, maxwritten_clean FROM pg_stat_bgwriter",
|
||||||
)
|
)
|
||||||
print("[compat] Using pg_stat_bgwriter")
|
print("[compat] Using pg_stat_bgwriter")
|
||||||
|
|
||||||
# Overview queries
|
_section("target endpoints")
|
||||||
await conn.fetchval("SELECT pg_is_in_recovery()")
|
# Target endpoint queries used in app/api/routes/targets.py
|
||||||
await conn.fetchval("SELECT pg_postmaster_start_time()")
|
await _run_required_fetch(
|
||||||
await conn.fetchval("SELECT inet_server_port()")
|
conn,
|
||||||
await conn.fetchval("SELECT pg_database_size(current_database())")
|
"target_endpoint.locks_table",
|
||||||
await conn.fetch("SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC")
|
"""
|
||||||
await conn.fetch(
|
SELECT locktype, mode, granted, relation::regclass::text AS relation, pid
|
||||||
|
FROM pg_locks
|
||||||
|
ORDER BY granted ASC, mode
|
||||||
|
LIMIT 500
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
await _run_required_fetch(
|
||||||
|
conn,
|
||||||
|
"target_endpoint.activity_table",
|
||||||
|
"""
|
||||||
|
SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query
|
||||||
|
FROM pg_stat_activity
|
||||||
|
WHERE datname = current_database()
|
||||||
|
ORDER BY query_start NULLS LAST
|
||||||
|
LIMIT 200
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
_section("overview")
|
||||||
|
# Overview queries used in app/services/overview_collector.py
|
||||||
|
await _run_required_fetchval(conn, "overview.show_server_version", "SHOW server_version")
|
||||||
|
await _run_required_fetchval(conn, "overview.pg_is_in_recovery", "SELECT pg_is_in_recovery()")
|
||||||
|
await _run_required_fetchval(conn, "overview.pg_postmaster_start_time", "SELECT pg_postmaster_start_time()")
|
||||||
|
await _run_required_fetchval(conn, "overview.current_database", "SELECT current_database()")
|
||||||
|
await _run_required_fetchval(conn, "overview.inet_server_port", "SELECT inet_server_port()")
|
||||||
|
await _run_required_fetchval(conn, "overview.pg_database_size_current", "SELECT pg_database_size(current_database())")
|
||||||
|
await _run_required_fetch(
|
||||||
|
conn,
|
||||||
|
"overview.pg_database_size_all",
|
||||||
|
"SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC",
|
||||||
|
)
|
||||||
|
await _run_required_fetch(
|
||||||
|
conn,
|
||||||
|
"overview.largest_tables",
|
||||||
"""
|
"""
|
||||||
SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes
|
SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes
|
||||||
FROM pg_stat_user_tables
|
FROM pg_stat_user_tables
|
||||||
ORDER BY size_bytes DESC
|
ORDER BY size_bytes DESC
|
||||||
LIMIT 5
|
LIMIT 5
|
||||||
"""
|
""",
|
||||||
)
|
)
|
||||||
await conn.fetchval("SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()")
|
await _run_required_fetchval(conn, "overview.pg_ls_waldir", "SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()")
|
||||||
|
await _run_required_fetchrow(
|
||||||
# Optional extension check should not fail if unavailable.
|
conn,
|
||||||
try:
|
"overview.performance_pg_stat_database",
|
||||||
await conn.fetch(
|
"""
|
||||||
"""
|
SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time
|
||||||
SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text
|
FROM pg_stat_database
|
||||||
FROM pg_stat_statements
|
WHERE datname = current_database()
|
||||||
ORDER BY total_exec_time DESC
|
""",
|
||||||
LIMIT 5
|
)
|
||||||
"""
|
await _run_required_fetchval(
|
||||||
|
conn,
|
||||||
|
"overview.autovacuum_activity",
|
||||||
|
"""
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_stat_activity
|
||||||
|
WHERE query ILIKE 'autovacuum:%'
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
await _run_required_fetchval(
|
||||||
|
conn,
|
||||||
|
"overview.checkpointer_view_exists",
|
||||||
|
"SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL",
|
||||||
|
)
|
||||||
|
if has_checkpointer:
|
||||||
|
await _run_required_fetchrow(
|
||||||
|
conn,
|
||||||
|
"overview.pg_stat_checkpointer",
|
||||||
|
"SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer",
|
||||||
)
|
)
|
||||||
print("[compat] pg_stat_statements query succeeded")
|
else:
|
||||||
except Exception as exc:
|
await _run_required_fetchrow(
|
||||||
print(f"[compat] pg_stat_statements unavailable/disabled (expected in some setups): {exc}")
|
conn,
|
||||||
|
"overview.pg_stat_bgwriter",
|
||||||
|
"SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter",
|
||||||
|
)
|
||||||
|
|
||||||
|
_section("optional paths")
|
||||||
|
# Optional paths that may depend on role/extension config.
|
||||||
|
await _run_optional(conn, "overview.replication_slots", "SELECT count(*) FROM pg_replication_slots")
|
||||||
|
await _run_optional(
|
||||||
|
conn,
|
||||||
|
"overview.pg_stat_replication",
|
||||||
|
"""
|
||||||
|
SELECT application_name, client_addr::text, state, sync_state,
|
||||||
|
EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds,
|
||||||
|
EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds,
|
||||||
|
EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds,
|
||||||
|
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes
|
||||||
|
FROM pg_stat_replication
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
await _run_optional(
|
||||||
|
conn,
|
||||||
|
"overview.standby_replay_lag",
|
||||||
|
"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))",
|
||||||
|
)
|
||||||
|
await _run_optional(
|
||||||
|
conn,
|
||||||
|
"collector.pg_stat_statements",
|
||||||
|
"""
|
||||||
|
SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text
|
||||||
|
FROM pg_stat_statements
|
||||||
|
ORDER BY total_exec_time DESC
|
||||||
|
LIMIT 20
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
print("[compat] Smoke checks passed")
|
print("[compat] Smoke checks passed")
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
Reference in New Issue
Block a user