Files
NexaPG/backend/app/api/routes/targets.py
nessi 117710cc0a [NX-101 Issue] Refactor error handling to use consistent API error format
Replaced all inline error messages with the standardized `api_error` helper for consistent error response formatting. This improves clarity, maintainability, and ensures uniform error structures across the application. Updated logging for collector failures to include error class and switched to warning level for target unreachable scenarios.
2026-02-14 11:30:56 +01:00

507 lines
19 KiB
Python

from datetime import datetime
from uuid import uuid4
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_, delete, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
from app.core.deps import get_current_user, require_roles
from app.core.errors import api_error
from app.models.models import Metric, QueryStat, Target, TargetOwner, User
from app.schemas.metric import MetricOut, QueryStatOut
from app.schemas.overview import DatabaseOverviewOut
from app.schemas.target import (
TargetConnectionTestRequest,
TargetCreate,
TargetOut,
TargetOwnerOut,
TargetOwnersUpdate,
TargetUpdate,
)
from app.services.audit import write_audit_log
from app.services.collector import build_target_dsn
from app.services.crypto import encrypt_secret
from app.services.overview_service import get_target_overview
router = APIRouter()
async def _owners_by_target_ids(db: AsyncSession, target_ids: list[int]) -> dict[int, list[int]]:
if not target_ids:
return {}
rows = (
await db.execute(select(TargetOwner.target_id, TargetOwner.user_id).where(TargetOwner.target_id.in_(target_ids)))
).all()
mapping: dict[int, list[int]] = {target_id: [] for target_id in target_ids}
for target_id, user_id in rows:
mapping.setdefault(target_id, []).append(user_id)
return mapping
def _target_out_with_owners(target: Target, owner_user_ids: list[int]) -> TargetOut:
return TargetOut(
id=target.id,
name=target.name,
host=target.host,
port=target.port,
dbname=target.dbname,
username=target.username,
sslmode=target.sslmode,
use_pg_stat_statements=target.use_pg_stat_statements,
owner_user_ids=owner_user_ids,
tags=target.tags or {},
created_at=target.created_at,
)
async def _set_target_owners(db: AsyncSession, target_id: int, user_ids: list[int], assigned_by_user_id: int | None) -> None:
await db.execute(delete(TargetOwner).where(TargetOwner.target_id == target_id))
for user_id in sorted(set(user_ids)):
db.add(TargetOwner(target_id=target_id, user_id=user_id, assigned_by_user_id=assigned_by_user_id))
async def _discover_databases(payload: TargetCreate) -> list[str]:
ssl = False if payload.sslmode == "disable" else True
conn = None
try:
conn = await asyncpg.connect(
host=payload.host,
port=payload.port,
database=payload.dbname,
user=payload.username,
password=payload.password,
ssl=ssl,
timeout=8,
)
rows = await conn.fetch(
"""
SELECT datname
FROM pg_database
WHERE datallowconn
AND NOT datistemplate
ORDER BY datname
"""
)
return [row["datname"] for row in rows if row["datname"]]
except Exception as exc:
raise HTTPException(
status_code=400,
detail=api_error("database_discovery_failed", "Database discovery failed", {"error": str(exc)}),
)
finally:
if conn:
await conn.close()
async def _next_unique_target_name(db: AsyncSession, base_name: str) -> str:
candidate = base_name.strip()
suffix = 2
while True:
exists = await db.scalar(select(Target.id).where(Target.name == candidate))
if exists is None:
return candidate
candidate = f"{base_name} ({suffix})"
suffix += 1
@router.get("", response_model=list[TargetOut])
async def list_targets(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[TargetOut]:
_ = user
targets = (await db.scalars(select(Target).order_by(Target.id.desc()))).all()
owner_map = await _owners_by_target_ids(db, [item.id for item in targets])
return [_target_out_with_owners(item, owner_map.get(item.id, [])) for item in targets]
@router.post("/test-connection")
async def test_target_connection(
payload: TargetConnectionTestRequest,
user: User = Depends(require_roles("admin", "operator")),
) -> dict:
_ = user
ssl = False if payload.sslmode == "disable" else True
conn = None
try:
conn = await asyncpg.connect(
host=payload.host,
port=payload.port,
database=payload.dbname,
user=payload.username,
password=payload.password,
ssl=ssl,
timeout=8,
)
version = await conn.fetchval("SHOW server_version")
return {"ok": True, "message": "Connection successful", "server_version": version}
except Exception as exc:
raise HTTPException(
status_code=400,
detail=api_error("connection_test_failed", "Connection test failed", {"error": str(exc)}),
)
finally:
if conn:
await conn.close()
@router.post("", response_model=TargetOut, status_code=status.HTTP_201_CREATED)
async def create_target(
payload: TargetCreate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> TargetOut:
owner_ids = sorted(set(payload.owner_user_ids or []))
if owner_ids:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_ids)))).all()
if len(set(owners_exist)) != len(owner_ids):
raise HTTPException(
status_code=400,
detail=api_error("owner_users_not_found", "One or more owner users were not found"),
)
encrypted_password = encrypt_secret(payload.password)
created_targets: list[Target] = []
if payload.discover_all_databases:
databases = await _discover_databases(payload)
if not databases:
raise HTTPException(
status_code=400,
detail=api_error("no_databases_discovered", "No databases discovered on target"),
)
group_id = str(uuid4())
base_tags = payload.tags or {}
for dbname in databases:
duplicate = await db.scalar(
select(Target.id).where(
Target.host == payload.host,
Target.port == payload.port,
Target.dbname == dbname,
Target.username == payload.username,
)
)
if duplicate is not None:
continue
target_name = await _next_unique_target_name(db, f"{payload.name} / {dbname}")
tags = {
**base_tags,
"monitor_mode": "all_databases",
"monitor_group_id": group_id,
"monitor_group_name": payload.name,
}
target = Target(
name=target_name,
host=payload.host,
port=payload.port,
dbname=dbname,
username=payload.username,
encrypted_password=encrypted_password,
sslmode=payload.sslmode,
use_pg_stat_statements=payload.use_pg_stat_statements,
tags=tags,
)
db.add(target)
await db.flush()
created_targets.append(target)
if owner_ids:
await _set_target_owners(db, target.id, owner_ids, user.id)
if not created_targets:
raise HTTPException(
status_code=400,
detail=api_error("all_discovered_databases_exist", "All discovered databases already exist as targets"),
)
await db.commit()
for item in created_targets:
await db.refresh(item)
await write_audit_log(
db,
"target.create.all_databases",
user.id,
{"base_name": payload.name, "created_count": len(created_targets), "host": payload.host, "port": payload.port},
)
owner_map = await _owners_by_target_ids(db, [created_targets[0].id])
return _target_out_with_owners(created_targets[0], owner_map.get(created_targets[0].id, []))
target_name = await _next_unique_target_name(db, payload.name)
target = Target(
name=target_name,
host=payload.host,
port=payload.port,
dbname=payload.dbname,
username=payload.username,
encrypted_password=encrypted_password,
sslmode=payload.sslmode,
use_pg_stat_statements=payload.use_pg_stat_statements,
tags=payload.tags,
)
db.add(target)
await db.commit()
await db.refresh(target)
if owner_ids:
await _set_target_owners(db, target.id, owner_ids, user.id)
await db.commit()
await write_audit_log(db, "target.create", user.id, {"target_id": target.id, "name": target.name})
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.get("/owner-candidates", response_model=list[TargetOwnerOut])
async def list_owner_candidates(
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
_ = user
users = (await db.scalars(select(User).order_by(User.email.asc()))).all()
return [TargetOwnerOut(user_id=item.id, email=item.email, role=item.role) for item in users]
@router.get("/{target_id}", response_model=TargetOut)
async def get_target(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> TargetOut:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.put("/{target_id}", response_model=TargetOut)
async def update_target(
target_id: int,
payload: TargetUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> TargetOut:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
updates = payload.model_dump(exclude_unset=True)
owner_user_ids = updates.pop("owner_user_ids", None)
if "password" in updates:
target.encrypted_password = encrypt_secret(updates.pop("password"))
for key, value in updates.items():
setattr(target, key, value)
if owner_user_ids is not None:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all()
if len(set(owners_exist)) != len(set(owner_user_ids)):
raise HTTPException(
status_code=400,
detail=api_error("owner_users_not_found", "One or more owner users were not found"),
)
await _set_target_owners(db, target.id, owner_user_ids, user.id)
await db.commit()
await db.refresh(target)
await write_audit_log(db, "target.update", user.id, {"target_id": target.id})
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.put("/{target_id}/owners", response_model=list[TargetOwnerOut])
async def set_target_owners(
target_id: int,
payload: TargetOwnersUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
owner_user_ids = sorted(set(payload.user_ids))
if owner_user_ids:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all()
if len(set(owners_exist)) != len(set(owner_user_ids)):
raise HTTPException(
status_code=400,
detail=api_error("owner_users_not_found", "One or more owner users were not found"),
)
await _set_target_owners(db, target_id, owner_user_ids, user.id)
await db.commit()
await write_audit_log(db, "target.owners.update", user.id, {"target_id": target_id, "owner_user_ids": owner_user_ids})
rows = (
await db.execute(
select(User.id, User.email, User.role)
.join(TargetOwner, TargetOwner.user_id == User.id)
.where(TargetOwner.target_id == target_id)
.order_by(User.email.asc())
)
).all()
return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows]
@router.get("/{target_id}/owners", response_model=list[TargetOwnerOut])
async def get_target_owners(
target_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
rows = (
await db.execute(
select(User.id, User.email, User.role)
.join(TargetOwner, TargetOwner.user_id == User.id)
.where(TargetOwner.target_id == target_id)
.order_by(User.email.asc())
)
).all()
return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows]
@router.delete("/{target_id}")
async def delete_target(
target_id: int,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> dict:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
await db.delete(target)
await db.commit()
await write_audit_log(db, "target.delete", user.id, {"target_id": target_id})
return {"status": "deleted"}
@router.get("/{target_id}/metrics", response_model=list[MetricOut])
async def get_metrics(
target_id: int,
metric: str = Query(...),
from_ts: datetime = Query(alias="from"),
to_ts: datetime = Query(alias="to"),
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[MetricOut]:
_ = user
rows = (
await db.scalars(
select(Metric).where(
and_(Metric.target_id == target_id, Metric.metric_name == metric, Metric.ts >= from_ts, Metric.ts <= to_ts)
).order_by(Metric.ts.asc())
)
).all()
return [MetricOut(ts=r.ts, metric_name=r.metric_name, value=r.value, labels=r.labels) for r in rows]
async def _live_conn(target: Target) -> asyncpg.Connection:
try:
return await asyncpg.connect(dsn=build_target_dsn(target))
except (OSError, asyncpg.PostgresError) as exc:
raise HTTPException(
status_code=503,
detail=api_error(
"target_unreachable",
"Target database is not reachable",
{
"target_id": target.id,
"host": target.host,
"port": target.port,
"error": str(exc),
},
),
) from exc
@router.get("/{target_id}/locks")
async def get_locks(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
conn = await _live_conn(target)
try:
rows = await conn.fetch(
"""
SELECT locktype, mode, granted, relation::regclass::text AS relation, pid
FROM pg_locks
ORDER BY granted ASC, mode
LIMIT 500
"""
)
return [dict(r) for r in rows]
finally:
await conn.close()
@router.get("/{target_id}/activity")
async def get_activity(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
conn = await _live_conn(target)
try:
rows = await conn.fetch(
"""
SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query
FROM pg_stat_activity
WHERE datname = current_database()
ORDER BY query_start NULLS LAST
LIMIT 200
"""
)
return [dict(r) for r in rows]
finally:
await conn.close()
@router.get("/{target_id}/top-queries", response_model=list[QueryStatOut])
async def get_top_queries(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[QueryStatOut]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
if not target.use_pg_stat_statements:
return []
rows = (
await db.scalars(
select(QueryStat)
.where(QueryStat.target_id == target_id)
.order_by(desc(QueryStat.ts))
.limit(100)
)
).all()
return [
QueryStatOut(
ts=r.ts,
queryid=r.queryid,
calls=r.calls,
total_time=r.total_time,
mean_time=r.mean_time,
rows=r.rows,
query_text=r.query_text,
)
for r in rows
]
@router.get("/{target_id}/overview", response_model=DatabaseOverviewOut)
async def get_overview(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> DatabaseOverviewOut:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found"))
try:
return await get_target_overview(target)
except (OSError, asyncpg.PostgresError) as exc:
raise HTTPException(
status_code=503,
detail=api_error(
"target_unreachable",
"Target database is not reachable",
{
"target_id": target.id,
"host": target.host,
"port": target.port,
"error": str(exc),
},
),
) from exc