Files
NexaPG/backend/app/api/routes/targets.py
nessi ea26ef4d33
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 6s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 6s
Add target owners and alert notification management.
This commit implements the addition of `target_owners` and `alert_notification_events` tables, enabling management of responsible users for targets. Backend and frontend components are updated to allow viewing, assigning, and notifying target owners about critical alerts via email.
2026-02-12 15:22:32 +01:00

351 lines
13 KiB
Python

from datetime import datetime
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import and_, delete, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
from app.core.deps import get_current_user, require_roles
from app.models.models import Metric, QueryStat, Target, TargetOwner, User
from app.schemas.metric import MetricOut, QueryStatOut
from app.schemas.overview import DatabaseOverviewOut
from app.schemas.target import (
TargetConnectionTestRequest,
TargetCreate,
TargetOut,
TargetOwnerOut,
TargetOwnersUpdate,
TargetUpdate,
)
from app.services.audit import write_audit_log
from app.services.collector import build_target_dsn
from app.services.crypto import encrypt_secret
from app.services.overview_service import get_target_overview
router = APIRouter()
async def _owners_by_target_ids(db: AsyncSession, target_ids: list[int]) -> dict[int, list[int]]:
if not target_ids:
return {}
rows = (
await db.execute(select(TargetOwner.target_id, TargetOwner.user_id).where(TargetOwner.target_id.in_(target_ids)))
).all()
mapping: dict[int, list[int]] = {target_id: [] for target_id in target_ids}
for target_id, user_id in rows:
mapping.setdefault(target_id, []).append(user_id)
return mapping
def _target_out_with_owners(target: Target, owner_user_ids: list[int]) -> TargetOut:
return TargetOut(
id=target.id,
name=target.name,
host=target.host,
port=target.port,
dbname=target.dbname,
username=target.username,
sslmode=target.sslmode,
use_pg_stat_statements=target.use_pg_stat_statements,
owner_user_ids=owner_user_ids,
tags=target.tags or {},
created_at=target.created_at,
)
async def _set_target_owners(db: AsyncSession, target_id: int, user_ids: list[int], assigned_by_user_id: int | None) -> None:
await db.execute(delete(TargetOwner).where(TargetOwner.target_id == target_id))
for user_id in sorted(set(user_ids)):
db.add(TargetOwner(target_id=target_id, user_id=user_id, assigned_by_user_id=assigned_by_user_id))
@router.get("", response_model=list[TargetOut])
async def list_targets(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[TargetOut]:
_ = user
targets = (await db.scalars(select(Target).order_by(Target.id.desc()))).all()
owner_map = await _owners_by_target_ids(db, [item.id for item in targets])
return [_target_out_with_owners(item, owner_map.get(item.id, [])) for item in targets]
@router.post("/test-connection")
async def test_target_connection(
payload: TargetConnectionTestRequest,
user: User = Depends(require_roles("admin", "operator")),
) -> dict:
_ = user
ssl = False if payload.sslmode == "disable" else True
conn = None
try:
conn = await asyncpg.connect(
host=payload.host,
port=payload.port,
database=payload.dbname,
user=payload.username,
password=payload.password,
ssl=ssl,
timeout=8,
)
version = await conn.fetchval("SHOW server_version")
return {"ok": True, "message": "Connection successful", "server_version": version}
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Connection failed: {exc}")
finally:
if conn:
await conn.close()
@router.post("", response_model=TargetOut, status_code=status.HTTP_201_CREATED)
async def create_target(
payload: TargetCreate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> TargetOut:
target = Target(
name=payload.name,
host=payload.host,
port=payload.port,
dbname=payload.dbname,
username=payload.username,
encrypted_password=encrypt_secret(payload.password),
sslmode=payload.sslmode,
use_pg_stat_statements=payload.use_pg_stat_statements,
tags=payload.tags,
)
db.add(target)
await db.commit()
await db.refresh(target)
if payload.owner_user_ids:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(payload.owner_user_ids)))).all()
if len(set(owners_exist)) != len(set(payload.owner_user_ids)):
raise HTTPException(status_code=400, detail="One or more owner users were not found")
await _set_target_owners(db, target.id, payload.owner_user_ids, user.id)
await db.commit()
await write_audit_log(db, "target.create", user.id, {"target_id": target.id, "name": target.name})
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.get("/owner-candidates", response_model=list[TargetOwnerOut])
async def list_owner_candidates(
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
_ = user
users = (await db.scalars(select(User).order_by(User.email.asc()))).all()
return [TargetOwnerOut(user_id=item.id, email=item.email, role=item.role) for item in users]
@router.get("/{target_id}", response_model=TargetOut)
async def get_target(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> TargetOut:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.put("/{target_id}", response_model=TargetOut)
async def update_target(
target_id: int,
payload: TargetUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> TargetOut:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
updates = payload.model_dump(exclude_unset=True)
owner_user_ids = updates.pop("owner_user_ids", None)
if "password" in updates:
target.encrypted_password = encrypt_secret(updates.pop("password"))
for key, value in updates.items():
setattr(target, key, value)
if owner_user_ids is not None:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all()
if len(set(owners_exist)) != len(set(owner_user_ids)):
raise HTTPException(status_code=400, detail="One or more owner users were not found")
await _set_target_owners(db, target.id, owner_user_ids, user.id)
await db.commit()
await db.refresh(target)
await write_audit_log(db, "target.update", user.id, {"target_id": target.id})
owner_map = await _owners_by_target_ids(db, [target.id])
return _target_out_with_owners(target, owner_map.get(target.id, []))
@router.put("/{target_id}/owners", response_model=list[TargetOwnerOut])
async def set_target_owners(
target_id: int,
payload: TargetOwnersUpdate,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
owner_user_ids = sorted(set(payload.user_ids))
if owner_user_ids:
owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all()
if len(set(owners_exist)) != len(set(owner_user_ids)):
raise HTTPException(status_code=400, detail="One or more owner users were not found")
await _set_target_owners(db, target_id, owner_user_ids, user.id)
await db.commit()
await write_audit_log(db, "target.owners.update", user.id, {"target_id": target_id, "owner_user_ids": owner_user_ids})
rows = (
await db.execute(
select(User.id, User.email, User.role)
.join(TargetOwner, TargetOwner.user_id == User.id)
.where(TargetOwner.target_id == target_id)
.order_by(User.email.asc())
)
).all()
return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows]
@router.get("/{target_id}/owners", response_model=list[TargetOwnerOut])
async def get_target_owners(
target_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[TargetOwnerOut]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
rows = (
await db.execute(
select(User.id, User.email, User.role)
.join(TargetOwner, TargetOwner.user_id == User.id)
.where(TargetOwner.target_id == target_id)
.order_by(User.email.asc())
)
).all()
return [TargetOwnerOut(user_id=row.id, email=row.email, role=row.role) for row in rows]
@router.delete("/{target_id}")
async def delete_target(
target_id: int,
user: User = Depends(require_roles("admin", "operator")),
db: AsyncSession = Depends(get_db),
) -> dict:
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
await db.delete(target)
await db.commit()
await write_audit_log(db, "target.delete", user.id, {"target_id": target_id})
return {"status": "deleted"}
@router.get("/{target_id}/metrics", response_model=list[MetricOut])
async def get_metrics(
target_id: int,
metric: str = Query(...),
from_ts: datetime = Query(alias="from"),
to_ts: datetime = Query(alias="to"),
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[MetricOut]:
_ = user
rows = (
await db.scalars(
select(Metric).where(
and_(Metric.target_id == target_id, Metric.metric_name == metric, Metric.ts >= from_ts, Metric.ts <= to_ts)
).order_by(Metric.ts.asc())
)
).all()
return [MetricOut(ts=r.ts, metric_name=r.metric_name, value=r.value, labels=r.labels) for r in rows]
async def _live_conn(target: Target) -> asyncpg.Connection:
return await asyncpg.connect(dsn=build_target_dsn(target))
@router.get("/{target_id}/locks")
async def get_locks(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
conn = await _live_conn(target)
try:
rows = await conn.fetch(
"""
SELECT locktype, mode, granted, relation::regclass::text AS relation, pid
FROM pg_locks
ORDER BY granted ASC, mode
LIMIT 500
"""
)
return [dict(r) for r in rows]
finally:
await conn.close()
@router.get("/{target_id}/activity")
async def get_activity(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[dict]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
conn = await _live_conn(target)
try:
rows = await conn.fetch(
"""
SELECT pid, usename, application_name, client_addr::text, state, wait_event_type, wait_event, now() - query_start AS running_for, left(query, 300) AS query
FROM pg_stat_activity
WHERE datname = current_database()
ORDER BY query_start NULLS LAST
LIMIT 200
"""
)
return [dict(r) for r in rows]
finally:
await conn.close()
@router.get("/{target_id}/top-queries", response_model=list[QueryStatOut])
async def get_top_queries(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> list[QueryStatOut]:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
if not target.use_pg_stat_statements:
return []
rows = (
await db.scalars(
select(QueryStat)
.where(QueryStat.target_id == target_id)
.order_by(desc(QueryStat.ts))
.limit(100)
)
).all()
return [
QueryStatOut(
ts=r.ts,
queryid=r.queryid,
calls=r.calls,
total_time=r.total_time,
mean_time=r.mean_time,
rows=r.rows,
query_text=r.query_text,
)
for r in rows
]
@router.get("/{target_id}/overview", response_model=DatabaseOverviewOut)
async def get_overview(target_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> DatabaseOverviewOut:
_ = user
target = await db.scalar(select(Target).where(Target.id == target_id))
if not target:
raise HTTPException(status_code=404, detail="Target not found")
return await get_target_overview(target)