from datetime import datetime, timezone import asyncpg from app.schemas.overview import ( DatabaseOverviewOut, DbSizeItem, DiskSpaceInfo, InstanceOverview, PerformanceOverview, ReplicationClientItem, ReplicationOverview, StorageOverview, TableSizeItem, ) from app.services.infra_probe import DiskSpaceProvider def _interval_seconds(value) -> float | None: if value is None: return None try: return float(value.total_seconds()) except Exception: return None async def _safe_fetchval(conn: asyncpg.Connection, query: str, errors: list[str], label: str): try: return await conn.fetchval(query) except Exception as exc: errors.append(f"{label}: {exc}") return None async def _safe_fetchrow(conn: asyncpg.Connection, query: str, errors: list[str], label: str): try: return await conn.fetchrow(query) except Exception as exc: errors.append(f"{label}: {exc}") return None async def _safe_fetch(conn: asyncpg.Connection, query: str, errors: list[str], label: str): try: return await conn.fetch(query) except Exception as exc: errors.append(f"{label}: {exc}") return [] async def collect_overview( conn: asyncpg.Connection, disk_provider: DiskSpaceProvider, target_host: str, cache_ttl_seconds: int ) -> DatabaseOverviewOut: errors: list[str] = [] version = await _safe_fetchval(conn, "SHOW server_version", errors, "show_server_version") in_recovery = await _safe_fetchval(conn, "SELECT pg_is_in_recovery()", errors, "pg_is_in_recovery") postmaster_start = await _safe_fetchval(conn, "SELECT pg_postmaster_start_time()", errors, "postmaster_start") current_db = await _safe_fetchval(conn, "SELECT current_database()", errors, "current_database") port = await _safe_fetchval(conn, "SELECT inet_server_port()", errors, "inet_server_port") uptime_seconds = None if postmaster_start: uptime_seconds = (datetime.now(timezone.utc) - postmaster_start).total_seconds() db_size = await _safe_fetchval( conn, "SELECT pg_database_size(current_database())", errors, "pg_database_size_current" ) all_db_rows = await _safe_fetch( conn, """ SELECT datname, pg_database_size(datname) AS size_bytes FROM pg_database ORDER BY size_bytes DESC """, errors, "pg_database_size_all", ) largest_tables_rows = await _safe_fetch( conn, """ SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes FROM pg_stat_user_tables ORDER BY size_bytes DESC LIMIT 5 """, errors, "largest_tables", ) wal_dir_size = await _safe_fetchval( conn, """ SELECT COALESCE(sum(size), 0) FROM pg_ls_waldir() """, errors, "wal_directory_size", ) stats_db = await _safe_fetchrow( conn, """ SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time FROM pg_stat_database WHERE datname = current_database() """, errors, "pg_stat_database_perf", ) bgwriter = await _safe_fetchrow( conn, "SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter", errors, "pg_stat_bgwriter", ) autovac_workers = await _safe_fetchval( conn, """ SELECT count(*) FROM pg_stat_activity WHERE query ILIKE 'autovacuum:%' """, errors, "autovacuum_activity", ) clients: list[ReplicationClientItem] = [] replay_lag_seconds = None slots_count = None active_clients = None role = "standby" if in_recovery else "primary" if in_recovery: replay_lag_seconds = await _safe_fetchval( conn, """ SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) """, errors, "standby_replay_lag", ) else: slots_count = await _safe_fetchval(conn, "SELECT count(*) FROM pg_replication_slots", errors, "replication_slots") repl_rows = await _safe_fetch( conn, """ SELECT application_name, client_addr::text, state, sync_state, EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds, EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds, EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds, pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes FROM pg_stat_replication """, errors, "pg_stat_replication", ) active_clients = len(repl_rows) clients = [ ReplicationClientItem( application_name=row.get("application_name"), client_addr=row.get("client_addr"), state=row.get("state"), sync_state=row.get("sync_state"), write_lag_seconds=row.get("write_lag_seconds"), flush_lag_seconds=row.get("flush_lag_seconds"), replay_lag_seconds=row.get("replay_lag_seconds"), replay_lag_bytes=float(row.get("replay_lag_bytes")) if row.get("replay_lag_bytes") is not None else None, ) for row in repl_rows ] disk = await disk_provider.get_free_bytes(target_host) return DatabaseOverviewOut( collected_at=datetime.now(timezone.utc), cache_ttl_seconds=cache_ttl_seconds, instance=InstanceOverview( server_version=version, role=role, postmaster_start_time=postmaster_start, uptime_seconds=uptime_seconds, current_database=current_db, port=int(port) if port is not None else None, ), storage=StorageOverview( current_database_size_bytes=int(db_size) if db_size is not None else None, all_databases=[DbSizeItem(name=r["datname"], size_bytes=int(r["size_bytes"])) for r in all_db_rows], largest_tables=[ TableSizeItem(schema=r["schemaname"], table=r["relname"], size_bytes=int(r["size_bytes"])) for r in largest_tables_rows ], wal_directory_size_bytes=int(wal_dir_size) if wal_dir_size is not None else None, disk_space=DiskSpaceInfo( source=disk.source, status=disk.status, free_bytes=disk.free_bytes, message=disk.message, ), ), replication=ReplicationOverview( mode=role, replay_lag_seconds=float(replay_lag_seconds) if replay_lag_seconds is not None else None, replication_slots_count=int(slots_count) if slots_count is not None else None, active_replication_clients=active_clients, clients=clients, ), performance=PerformanceOverview( xact_commit=stats_db["xact_commit"] if stats_db else None, xact_rollback=stats_db["xact_rollback"] if stats_db else None, deadlocks=stats_db["deadlocks"] if stats_db else None, temp_files=stats_db["temp_files"] if stats_db else None, temp_bytes=stats_db["temp_bytes"] if stats_db else None, blk_read_time=stats_db["blk_read_time"] if stats_db else None, blk_write_time=stats_db["blk_write_time"] if stats_db else None, checkpoints_timed=bgwriter["checkpoints_timed"] if bgwriter else None, checkpoints_req=bgwriter["checkpoints_req"] if bgwriter else None, autovacuum_workers=int(autovac_workers) if autovac_workers is not None else None, ), partial_failures=errors, )