Add database overview feature with metrics and UI enhancements
This commit introduces a detailed database overview endpoint and service, providing key metrics such as replication status, database sizes, and performance indicators. On the frontend, a new UI section displays these metrics along with improved forms and troubleshooting tips. Enhancements improve user experience by adding informative tooltips and formatting for byte and time values.
This commit is contained in:
@@ -7,10 +7,12 @@ from app.core.db import get_db
|
||||
from app.core.deps import get_current_user, require_roles
|
||||
from app.models.models import Metric, QueryStat, Target, User
|
||||
from app.schemas.metric import MetricOut, QueryStatOut
|
||||
from app.schemas.overview import DatabaseOverviewOut
|
||||
from app.schemas.target import TargetCreate, TargetOut, TargetUpdate
|
||||
from app.services.audit import write_audit_log
|
||||
from app.services.collector import build_target_dsn
|
||||
from app.services.crypto import encrypt_secret
|
||||
from app.services.overview_service import get_target_overview
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -179,3 +181,12 @@ async def get_top_queries(target_id: int, user: User = Depends(get_current_user)
|
||||
)
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
@router.get("/{target_id}/overview", response_model=DatabaseOverviewOut)
|
||||
async def get_overview(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> DatabaseOverviewOut:
|
||||
_ = user
|
||||
target = await db.scalar(select(Target).where(Target.id == target_id))
|
||||
if not target:
|
||||
raise HTTPException(status_code=404, detail="Target not found")
|
||||
return await get_target_overview(target)
|
||||
|
||||
@@ -49,10 +49,11 @@ async def lifespan(_: FastAPI):
|
||||
|
||||
|
||||
app = FastAPI(title=settings.app_name, lifespan=lifespan)
|
||||
use_wildcard_cors = settings.cors_origins.strip() == "*"
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=settings.cors_origins_list,
|
||||
allow_credentials=True,
|
||||
allow_origins=["*"] if use_wildcard_cors else settings.cors_origins_list,
|
||||
allow_credentials=False if use_wildcard_cors else True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
79
backend/app/schemas/overview.py
Normal file
79
backend/app/schemas/overview.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class DbSizeItem(BaseModel):
|
||||
name: str
|
||||
size_bytes: int
|
||||
|
||||
|
||||
class TableSizeItem(BaseModel):
|
||||
schema: str
|
||||
table: str
|
||||
size_bytes: int
|
||||
|
||||
|
||||
class ReplicationClientItem(BaseModel):
|
||||
application_name: str | None = None
|
||||
client_addr: str | None = None
|
||||
state: str | None = None
|
||||
sync_state: str | None = None
|
||||
write_lag_seconds: float | None = None
|
||||
flush_lag_seconds: float | None = None
|
||||
replay_lag_seconds: float | None = None
|
||||
replay_lag_bytes: float | None = None
|
||||
|
||||
|
||||
class DiskSpaceInfo(BaseModel):
|
||||
source: str
|
||||
status: str
|
||||
free_bytes: int | None = None
|
||||
message: str | None = None
|
||||
|
||||
|
||||
class InstanceOverview(BaseModel):
|
||||
server_version: str | None = None
|
||||
role: str
|
||||
postmaster_start_time: datetime | None = None
|
||||
uptime_seconds: float | None = None
|
||||
current_database: str | None = None
|
||||
port: int | None = None
|
||||
|
||||
|
||||
class StorageOverview(BaseModel):
|
||||
current_database_size_bytes: int | None = None
|
||||
all_databases: list[DbSizeItem]
|
||||
largest_tables: list[TableSizeItem]
|
||||
wal_directory_size_bytes: int | None = None
|
||||
disk_space: DiskSpaceInfo
|
||||
|
||||
|
||||
class ReplicationOverview(BaseModel):
|
||||
mode: str
|
||||
replay_lag_seconds: float | None = None
|
||||
replication_slots_count: int | None = None
|
||||
active_replication_clients: int | None = None
|
||||
clients: list[ReplicationClientItem]
|
||||
|
||||
|
||||
class PerformanceOverview(BaseModel):
|
||||
xact_commit: int | None = None
|
||||
xact_rollback: int | None = None
|
||||
deadlocks: int | None = None
|
||||
temp_files: int | None = None
|
||||
temp_bytes: int | None = None
|
||||
blk_read_time: float | None = None
|
||||
blk_write_time: float | None = None
|
||||
checkpoints_timed: int | None = None
|
||||
checkpoints_req: int | None = None
|
||||
autovacuum_workers: int | None = None
|
||||
|
||||
|
||||
class DatabaseOverviewOut(BaseModel):
|
||||
collected_at: datetime
|
||||
cache_ttl_seconds: int
|
||||
instance: InstanceOverview
|
||||
storage: StorageOverview
|
||||
replication: ReplicationOverview
|
||||
performance: PerformanceOverview
|
||||
partial_failures: list[str]
|
||||
29
backend/app/services/infra_probe.py
Normal file
29
backend/app/services/infra_probe.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiskSpaceProbeResult:
|
||||
source: str
|
||||
status: str
|
||||
free_bytes: int | None = None
|
||||
message: str | None = None
|
||||
|
||||
|
||||
class DiskSpaceProvider:
|
||||
async def get_free_bytes(self, target_host: str) -> DiskSpaceProbeResult:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class NullDiskSpaceProvider(DiskSpaceProvider):
|
||||
async def get_free_bytes(self, target_host: str) -> DiskSpaceProbeResult:
|
||||
return DiskSpaceProbeResult(
|
||||
source="none",
|
||||
status="unavailable",
|
||||
free_bytes=None,
|
||||
message=f"No infra probe configured for host {target_host}. Add SSH/Agent provider later.",
|
||||
)
|
||||
|
||||
|
||||
def get_disk_space_provider() -> DiskSpaceProvider:
|
||||
# Extension point: replace by SSH/Agent backed implementation.
|
||||
return NullDiskSpaceProvider()
|
||||
218
backend/app/services/overview_collector.py
Normal file
218
backend/app/services/overview_collector.py
Normal file
@@ -0,0 +1,218 @@
|
||||
from datetime import datetime, timezone
|
||||
import asyncpg
|
||||
from app.schemas.overview import (
|
||||
DatabaseOverviewOut,
|
||||
DbSizeItem,
|
||||
DiskSpaceInfo,
|
||||
InstanceOverview,
|
||||
PerformanceOverview,
|
||||
ReplicationClientItem,
|
||||
ReplicationOverview,
|
||||
StorageOverview,
|
||||
TableSizeItem,
|
||||
)
|
||||
from app.services.infra_probe import DiskSpaceProvider
|
||||
|
||||
|
||||
def _interval_seconds(value) -> float | None:
|
||||
if value is None:
|
||||
return None
|
||||
try:
|
||||
return float(value.total_seconds())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
async def _safe_fetchval(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
|
||||
try:
|
||||
return await conn.fetchval(query)
|
||||
except Exception as exc:
|
||||
errors.append(f"{label}: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
async def _safe_fetchrow(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
|
||||
try:
|
||||
return await conn.fetchrow(query)
|
||||
except Exception as exc:
|
||||
errors.append(f"{label}: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
async def _safe_fetch(conn: asyncpg.Connection, query: str, errors: list[str], label: str):
|
||||
try:
|
||||
return await conn.fetch(query)
|
||||
except Exception as exc:
|
||||
errors.append(f"{label}: {exc}")
|
||||
return []
|
||||
|
||||
|
||||
async def collect_overview(
|
||||
conn: asyncpg.Connection, disk_provider: DiskSpaceProvider, target_host: str, cache_ttl_seconds: int
|
||||
) -> DatabaseOverviewOut:
|
||||
errors: list[str] = []
|
||||
|
||||
version = await _safe_fetchval(conn, "SHOW server_version", errors, "show_server_version")
|
||||
in_recovery = await _safe_fetchval(conn, "SELECT pg_is_in_recovery()", errors, "pg_is_in_recovery")
|
||||
postmaster_start = await _safe_fetchval(conn, "SELECT pg_postmaster_start_time()", errors, "postmaster_start")
|
||||
current_db = await _safe_fetchval(conn, "SELECT current_database()", errors, "current_database")
|
||||
port = await _safe_fetchval(conn, "SELECT inet_server_port()", errors, "inet_server_port")
|
||||
|
||||
uptime_seconds = None
|
||||
if postmaster_start:
|
||||
uptime_seconds = (datetime.now(timezone.utc) - postmaster_start).total_seconds()
|
||||
|
||||
db_size = await _safe_fetchval(
|
||||
conn, "SELECT pg_database_size(current_database())", errors, "pg_database_size_current"
|
||||
)
|
||||
all_db_rows = await _safe_fetch(
|
||||
conn,
|
||||
"""
|
||||
SELECT datname, pg_database_size(datname) AS size_bytes
|
||||
FROM pg_database
|
||||
ORDER BY size_bytes DESC
|
||||
""",
|
||||
errors,
|
||||
"pg_database_size_all",
|
||||
)
|
||||
largest_tables_rows = await _safe_fetch(
|
||||
conn,
|
||||
"""
|
||||
SELECT schemaname, relname, pg_total_relation_size(schemaname || '.' || relname) AS size_bytes
|
||||
FROM pg_stat_user_tables
|
||||
ORDER BY size_bytes DESC
|
||||
LIMIT 5
|
||||
""",
|
||||
errors,
|
||||
"largest_tables",
|
||||
)
|
||||
wal_dir_size = await _safe_fetchval(
|
||||
conn,
|
||||
"""
|
||||
SELECT COALESCE(sum(size), 0)
|
||||
FROM pg_ls_waldir()
|
||||
""",
|
||||
errors,
|
||||
"wal_directory_size",
|
||||
)
|
||||
|
||||
stats_db = await _safe_fetchrow(
|
||||
conn,
|
||||
"""
|
||||
SELECT xact_commit, xact_rollback, deadlocks, temp_files, temp_bytes, blk_read_time, blk_write_time
|
||||
FROM pg_stat_database
|
||||
WHERE datname = current_database()
|
||||
""",
|
||||
errors,
|
||||
"pg_stat_database_perf",
|
||||
)
|
||||
bgwriter = await _safe_fetchrow(
|
||||
conn,
|
||||
"SELECT checkpoints_timed, checkpoints_req FROM pg_stat_bgwriter",
|
||||
errors,
|
||||
"pg_stat_bgwriter",
|
||||
)
|
||||
autovac_workers = await _safe_fetchval(
|
||||
conn,
|
||||
"""
|
||||
SELECT count(*)
|
||||
FROM pg_stat_activity
|
||||
WHERE query ILIKE 'autovacuum:%'
|
||||
""",
|
||||
errors,
|
||||
"autovacuum_activity",
|
||||
)
|
||||
|
||||
clients: list[ReplicationClientItem] = []
|
||||
replay_lag_seconds = None
|
||||
slots_count = None
|
||||
active_clients = None
|
||||
role = "standby" if in_recovery else "primary"
|
||||
if in_recovery:
|
||||
replay_lag_seconds = await _safe_fetchval(
|
||||
conn,
|
||||
"""
|
||||
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
|
||||
""",
|
||||
errors,
|
||||
"standby_replay_lag",
|
||||
)
|
||||
else:
|
||||
slots_count = await _safe_fetchval(conn, "SELECT count(*) FROM pg_replication_slots", errors, "replication_slots")
|
||||
repl_rows = await _safe_fetch(
|
||||
conn,
|
||||
"""
|
||||
SELECT application_name, client_addr::text, state, sync_state,
|
||||
EXTRACT(EPOCH FROM write_lag) AS write_lag_seconds,
|
||||
EXTRACT(EPOCH FROM flush_lag) AS flush_lag_seconds,
|
||||
EXTRACT(EPOCH FROM replay_lag) AS replay_lag_seconds,
|
||||
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes
|
||||
FROM pg_stat_replication
|
||||
""",
|
||||
errors,
|
||||
"pg_stat_replication",
|
||||
)
|
||||
active_clients = len(repl_rows)
|
||||
clients = [
|
||||
ReplicationClientItem(
|
||||
application_name=row.get("application_name"),
|
||||
client_addr=row.get("client_addr"),
|
||||
state=row.get("state"),
|
||||
sync_state=row.get("sync_state"),
|
||||
write_lag_seconds=row.get("write_lag_seconds"),
|
||||
flush_lag_seconds=row.get("flush_lag_seconds"),
|
||||
replay_lag_seconds=row.get("replay_lag_seconds"),
|
||||
replay_lag_bytes=float(row.get("replay_lag_bytes")) if row.get("replay_lag_bytes") is not None else None,
|
||||
)
|
||||
for row in repl_rows
|
||||
]
|
||||
|
||||
disk = await disk_provider.get_free_bytes(target_host)
|
||||
|
||||
return DatabaseOverviewOut(
|
||||
collected_at=datetime.now(timezone.utc),
|
||||
cache_ttl_seconds=cache_ttl_seconds,
|
||||
instance=InstanceOverview(
|
||||
server_version=version,
|
||||
role=role,
|
||||
postmaster_start_time=postmaster_start,
|
||||
uptime_seconds=uptime_seconds,
|
||||
current_database=current_db,
|
||||
port=int(port) if port is not None else None,
|
||||
),
|
||||
storage=StorageOverview(
|
||||
current_database_size_bytes=int(db_size) if db_size is not None else None,
|
||||
all_databases=[DbSizeItem(name=r["datname"], size_bytes=int(r["size_bytes"])) for r in all_db_rows],
|
||||
largest_tables=[
|
||||
TableSizeItem(schema=r["schemaname"], table=r["relname"], size_bytes=int(r["size_bytes"]))
|
||||
for r in largest_tables_rows
|
||||
],
|
||||
wal_directory_size_bytes=int(wal_dir_size) if wal_dir_size is not None else None,
|
||||
disk_space=DiskSpaceInfo(
|
||||
source=disk.source,
|
||||
status=disk.status,
|
||||
free_bytes=disk.free_bytes,
|
||||
message=disk.message,
|
||||
),
|
||||
),
|
||||
replication=ReplicationOverview(
|
||||
mode=role,
|
||||
replay_lag_seconds=float(replay_lag_seconds) if replay_lag_seconds is not None else None,
|
||||
replication_slots_count=int(slots_count) if slots_count is not None else None,
|
||||
active_replication_clients=active_clients,
|
||||
clients=clients,
|
||||
),
|
||||
performance=PerformanceOverview(
|
||||
xact_commit=stats_db["xact_commit"] if stats_db else None,
|
||||
xact_rollback=stats_db["xact_rollback"] if stats_db else None,
|
||||
deadlocks=stats_db["deadlocks"] if stats_db else None,
|
||||
temp_files=stats_db["temp_files"] if stats_db else None,
|
||||
temp_bytes=stats_db["temp_bytes"] if stats_db else None,
|
||||
blk_read_time=stats_db["blk_read_time"] if stats_db else None,
|
||||
blk_write_time=stats_db["blk_write_time"] if stats_db else None,
|
||||
checkpoints_timed=bgwriter["checkpoints_timed"] if bgwriter else None,
|
||||
checkpoints_req=bgwriter["checkpoints_req"] if bgwriter else None,
|
||||
autovacuum_workers=int(autovac_workers) if autovac_workers is not None else None,
|
||||
),
|
||||
partial_failures=errors,
|
||||
)
|
||||
46
backend/app/services/overview_service.py
Normal file
46
backend/app/services/overview_service.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import asyncio
|
||||
import time
|
||||
import asyncpg
|
||||
from app.models.models import Target
|
||||
from app.schemas.overview import DatabaseOverviewOut
|
||||
from app.services.collector import build_target_dsn
|
||||
from app.services.infra_probe import get_disk_space_provider
|
||||
from app.services.overview_collector import collect_overview
|
||||
|
||||
_CACHE_TTL_SECONDS = 20
|
||||
_cache: dict[int, tuple[float, DatabaseOverviewOut]] = {}
|
||||
_locks: dict[int, asyncio.Lock] = {}
|
||||
|
||||
|
||||
def _get_lock(target_id: int) -> asyncio.Lock:
|
||||
if target_id not in _locks:
|
||||
_locks[target_id] = asyncio.Lock()
|
||||
return _locks[target_id]
|
||||
|
||||
|
||||
async def get_target_overview(target: Target) -> DatabaseOverviewOut:
|
||||
now = time.time()
|
||||
cached = _cache.get(target.id)
|
||||
if cached and (now - cached[0]) < _CACHE_TTL_SECONDS:
|
||||
return cached[1]
|
||||
|
||||
lock = _get_lock(target.id)
|
||||
async with lock:
|
||||
cached = _cache.get(target.id)
|
||||
now = time.time()
|
||||
if cached and (now - cached[0]) < _CACHE_TTL_SECONDS:
|
||||
return cached[1]
|
||||
|
||||
conn = await asyncpg.connect(dsn=build_target_dsn(target))
|
||||
try:
|
||||
payload = await collect_overview(
|
||||
conn=conn,
|
||||
disk_provider=get_disk_space_provider(),
|
||||
target_host=target.host,
|
||||
cache_ttl_seconds=_CACHE_TTL_SECONDS,
|
||||
)
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
_cache[target.id] = (time.time(), payload)
|
||||
return payload
|
||||
Reference in New Issue
Block a user