from datetime import datetime import asyncpg from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, desc, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import get_current_user, require_roles from app.models.models import Metric, QueryStat, Target, User from app.schemas.metric import MetricOut, QueryStatOut from app.schemas.overview import DatabaseOverviewOut from app.schemas.target import TargetCreate, TargetOut, TargetUpdate from app.services.audit import write_audit_log from app.services.collector import build_target_dsn from app.services.crypto import encrypt_secret from app.services.overview_service import get_target_overview router = APIRouter() @router.get("", response_model=list[TargetOut]) async def list_targets(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[TargetOut]: targets = (await db.scalars(select(Target).order_by(Target.id.desc()))).all() return [TargetOut.model_validate(item) for item in targets] @router.post("", response_model=TargetOut, status_code=status.HTTP_201_CREATED) async def create_target( payload: TargetCreate, user: User = Depends(require_roles("admin", "operator")), db: AsyncSession = Depends(get_db), ) -> TargetOut: target = Target( name=payload.name, host=payload.host, port=payload.port, dbname=payload.dbname, username=payload.username, encrypted_password=encrypt_secret(payload.password), sslmode=payload.sslmode, tags=payload.tags, ) db.add(target) await db.commit() await db.refresh(target) await write_audit_log(db, "target.create", user.id, {"target_id": target.id, "name": target.name}) return TargetOut.model_validate(target) @router.get("/{target_id}", response_model=TargetOut) async def get_target(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> TargetOut: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") return TargetOut.model_validate(target) @router.put("/{target_id}", response_model=TargetOut) async def update_target( target_id: int, payload: TargetUpdate, user: User = Depends(require_roles("admin", "operator")), db: AsyncSession = Depends(get_db), ) -> TargetOut: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") updates = payload.model_dump(exclude_unset=True) if "password" in updates: target.encrypted_password = encrypt_secret(updates.pop("password")) for key, value in updates.items(): setattr(target, key, value) await db.commit() await db.refresh(target) await write_audit_log(db, "target.update", user.id, {"target_id": target.id}) return TargetOut.model_validate(target) @router.delete("/{target_id}") async def delete_target( target_id: int, user: User = Depends(require_roles("admin", "operator")), db: AsyncSession = Depends(get_db), ) -> dict: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") await db.delete(target) await db.commit() await write_audit_log(db, "target.delete", user.id, {"target_id": target_id}) return {"status": "deleted"} @router.get("/{target_id}/metrics", response_model=list[MetricOut]) async def get_metrics( target_id: int, metric: str = Query(...), from_ts: datetime = Query(alias="from"), to_ts: datetime = Query(alias="to"), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> list[MetricOut]: _ = user rows = ( await db.scalars( select(Metric).where( and_(Metric.target_id == target_id, Metric.metric_name == metric, Metric.ts >= from_ts, Metric.ts <= to_ts) ).order_by(Metric.ts.asc()) ) ).all() return [MetricOut(ts=r.ts, metric_name=r.metric_name, value=r.value, labels=r.labels) for r in rows] async def _live_conn(target: Target) -> asyncpg.Connection: return await asyncpg.connect(dsn=build_target_dsn(target)) @router.get("/{target_id}/locks") async def get_locks(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]: _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") conn = await _live_conn(target) try: rows = await conn.fetch( """ SELECT locktype, mode, granted, relation::regclass::text AS relation, pid FROM pg_locks ORDER BY granted ASC, mode LIMIT 500 """ ) return [dict(r) for r in rows] finally: await conn.close() @router.get("/{target_id}/activity") async def get_activity(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]: _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") conn = await _live_conn(target) try: rows = await conn.fetch( """ SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query FROM pg_stat_activity WHERE datname = current_database() ORDER BY query_start NULLS LAST LIMIT 200 """ ) return [dict(r) for r in rows] finally: await conn.close() @router.get("/{target_id}/top-queries", response_model=list[QueryStatOut]) async def get_top_queries(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[QueryStatOut]: _ = user rows = ( await db.scalars( select(QueryStat) .where(QueryStat.target_id == target_id) .order_by(desc(QueryStat.ts)) .limit(100) ) ).all() return [ QueryStatOut( ts=r.ts, queryid=r.queryid, calls=r.calls, total_time=r.total_time, mean_time=r.mean_time, rows=r.rows, query_text=r.query_text, ) for r in rows ] @router.get("/{target_id}/overview", response_model=DatabaseOverviewOut) async def get_overview(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> DatabaseOverviewOut: _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: raise HTTPException(status_code=404, detail="Target not found") return await get_target_overview(target)