Files
NexaPG/backend/scripts/pg_compat_smoke.py
nessi ff6d7998c3
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 17s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 19s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 18s
Add support for multiple PostgreSQL DSN candidates
This update introduces `PG_DSN_CANDIDATES` for specifying multiple DSN options, improving compatibility and CI portability. The script now attempts connections sequentially using the provided candidates before falling back to single DSN or raising an error. Relevant updates to documentation and workflow configuration have also been made.
2026-02-12 14:36:07 +01:00

163 lines
5.5 KiB
Python

import asyncio
import os
import sys
from typing import Any
import asyncpg
def _required_env(name: str) -> str:
value = os.getenv(name, "").strip()
if not value:
raise RuntimeError(f"Missing required env var: {name}")
return value
async def _connect_with_retry(dsn: str, attempts: int = 40, delay_seconds: float = 1.5) -> asyncpg.Connection:
last_exc: Exception | None = None
for _ in range(attempts):
try:
return await asyncpg.connect(dsn=dsn, timeout=5)
except Exception as exc: # pragma: no cover - smoke utility
last_exc = exc
await asyncio.sleep(delay_seconds)
raise RuntimeError(f"Could not connect to PostgreSQL after retries: {last_exc}")
async def _fetchrow_required(conn: asyncpg.Connection, query: str, label: str) -> dict[str, Any]:
row = await conn.fetchrow(query)
if row is None:
raise RuntimeError(f"{label} returned no rows")
return dict(row)
def _dsn_candidates() -> list[str]:
# Preferred: explicit candidate list for CI portability (Gitea/GitHub runners).
raw_candidates = os.getenv("PG_DSN_CANDIDATES", "").strip()
if raw_candidates:
values = [item.strip() for item in raw_candidates.split(",") if item.strip()]
if values:
return values
# Backward compatible single DSN.
raw_single = os.getenv("PG_DSN", "").strip()
if raw_single:
return [raw_single]
raise RuntimeError("Missing PG_DSN or PG_DSN_CANDIDATES")
async def run() -> None:
candidates = _dsn_candidates()
last_error: Exception | None = None
conn: asyncpg.Connection | None = None
used_dsn = ""
for dsn in candidates:
try:
conn = await _connect_with_retry(dsn)
used_dsn = dsn
break
except Exception as exc:
last_error = exc
if conn is None:
raise RuntimeError(f"Could not connect to PostgreSQL using candidates: {last_error}")
try:
version = await conn.fetchval("SHOW server_version")
current_db = await conn.fetchval("SELECT current_database()")
print(f"[compat] Connected: version={version} db={current_db} dsn={used_dsn}")
# Core collector queries
await _fetchrow_required(
conn,
"""
SELECT
numbackends,
xact_commit,
xact_rollback,
deadlocks,
temp_files,
temp_bytes,
blk_read_time,
blk_write_time,
blks_hit,
blks_read,
tup_returned,
tup_fetched
FROM pg_stat_database
WHERE datname = current_database()
""",
"pg_stat_database",
)
await _fetchrow_required(
conn,
"""
SELECT
count(*) FILTER (WHERE state = 'active') AS active_connections,
count(*) AS total_connections
FROM pg_stat_activity
WHERE datname = current_database()
""",
"pg_stat_activity",
)
await conn.fetchval("SELECT count(*) FROM pg_locks")
# Checkpoint stats fallback (PG14/15 vs newer changes)
has_checkpointer = await conn.fetchval("SELECT to_regclass('pg_catalog.pg_stat_checkpointer') IS NOT NULL")
if has_checkpointer:
await _fetchrow_required(
conn,
"SELECT num_timed AS checkpoints_timed, num_requested AS checkpoints_req FROM pg_stat_checkpointer",
"pg_stat_checkpointer",
)
print("[compat] Using pg_stat_checkpointer")
else:
await _fetchrow_required(
conn,
"SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter",
"pg_stat_bgwriter",
)
print("[compat] Using pg_stat_bgwriter")
# Overview queries
await conn.fetchval("SELECT pg_is_in_recovery()")
await conn.fetchval("SELECT pg_postmaster_start_time()")
await conn.fetchval("SELECT inet_server_port()")
await conn.fetchval("SELECT pg_database_size(current_database())")
await conn.fetch("SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC")
await conn.fetch(
"""
SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes
FROM pg_stat_user_tables
ORDER BY size_bytes DESC
LIMIT 5
"""
)
await conn.fetchval("SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir()")
# Optional extension check should not fail if unavailable.
try:
await conn.fetch(
"""
SELECT queryid::text, calls, total_exec_time, mean_exec_time, rows, left(query, 2000) AS query_text
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 5
"""
)
print("[compat] pg_stat_statements query succeeded")
except Exception as exc:
print(f"[compat] pg_stat_statements unavailable/disabled (expected in some setups): {exc}")
print("[compat] Smoke checks passed")
finally:
await conn.close()
if __name__ == "__main__":
try:
asyncio.run(run())
except Exception as exc: # pragma: no cover - smoke utility
print(f"[compat] FAILED: {exc}", file=sys.stderr)
raise