diff --git a/.github/workflows/migration-safety.yml b/.github/workflows/migration-safety.yml new file mode 100644 index 0000000..e34bba8 --- /dev/null +++ b/.github/workflows/migration-safety.yml @@ -0,0 +1,86 @@ +name: Migration Safety + +on: + push: + branches: ["main", "master"] + pull_request: + +jobs: + migration-safety: + name: Alembic upgrade/downgrade safety + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:16 + env: + POSTGRES_DB: nexapg + POSTGRES_USER: nexapg + POSTGRES_PASSWORD: nexapg + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U nexapg -d nexapg" + --health-interval 5s + --health-timeout 5s + --health-retries 30 + + env: + DB_HOST: postgres + DB_PORT: 5432 + DB_NAME: nexapg + DB_USER: nexapg + DB_PASSWORD: nexapg + JWT_SECRET_KEY: ci-jwt-secret-key + ENCRYPTION_KEY: MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDA= + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install backend dependencies + run: pip install -r backend/requirements.txt + + - name: Install PostgreSQL client tools + run: sudo apt-get update && sudo apt-get install -y postgresql-client + + - name: Wait for PostgreSQL + env: + PGPASSWORD: nexapg + run: | + for i in $(seq 1 60); do + if pg_isready -h postgres -p 5432 -U nexapg -d nexapg; then + exit 0 + fi + sleep 2 + done + echo "PostgreSQL did not become ready in time." + exit 1 + + - name: Alembic upgrade -> downgrade -1 -> upgrade + working-directory: backend + run: | + alembic upgrade head + alembic downgrade -1 + alembic upgrade head + + - name: Validate schema consistency after roundtrip + env: + PGPASSWORD: nexapg + run: | + cd backend + alembic upgrade head + pg_dump -h postgres -p 5432 -U nexapg -d nexapg --schema-only --no-owner --no-privileges \ + | sed '/^\\restrict /d; /^\\unrestrict /d' > /tmp/schema_head_before.sql + + alembic downgrade -1 + alembic upgrade head + pg_dump -h postgres -p 5432 -U nexapg -d nexapg --schema-only --no-owner --no-privileges \ + | sed '/^\\restrict /d; /^\\unrestrict /d' > /tmp/schema_head_after.sql + + diff -u /tmp/schema_head_before.sql /tmp/schema_head_after.sql diff --git a/README.md b/README.md index 6b2d6e5..39f502c 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ It combines FastAPI, React, and PostgreSQL in a Docker Compose stack with RBAC, - [Service Information](#service-information) - [Target Owner Notifications](#target-owner-notifications) - [API Overview](#api-overview) +- [API Error Format](#api-error-format) - [`pg_stat_statements` Requirement](#pg_stat_statements-requirement) - [Reverse Proxy / SSL Guidance](#reverse-proxy--ssl-guidance) - [PostgreSQL Compatibility Smoke Test](#postgresql-compatibility-smoke-test) @@ -319,6 +320,37 @@ Email alert routing is target-specific: - `GET /api/v1/service/info` - `POST /api/v1/service/info/check` +## API Error Format + +All 4xx/5xx responses use a consistent JSON payload: + +```json +{ + "code": "validation_error", + "message": "Request validation failed", + "details": [], + "request_id": "c8f0f888-2365-4b86-a5de-b3f0e9df4a4b" +} +``` + +Common fields: + +- `code`: stable machine-readable error code +- `message`: human-readable summary +- `details`: optional extra context (validation list, debug context, etc.) +- `request_id`: request correlation ID (also returned in `X-Request-ID` header) + +Common error codes: + +- `bad_request` (`400`) +- `unauthorized` (`401`) +- `forbidden` (`403`) +- `not_found` (`404`) +- `conflict` (`409`) +- `validation_error` (`422`) +- `target_unreachable` (`503`) +- `internal_error` (`500`) + ## `pg_stat_statements` Requirement Query Insights requires `pg_stat_statements` on the monitored target: diff --git a/backend/app/api/routes/admin_settings.py b/backend/app/api/routes/admin_settings.py index 5c227c5..6c8a4bc 100644 --- a/backend/app/api/routes/admin_settings.py +++ b/backend/app/api/routes/admin_settings.py @@ -9,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import require_roles +from app.core.errors import api_error from app.models.models import EmailNotificationSettings, User from app.schemas.admin_settings import EmailSettingsOut, EmailSettingsTestRequest, EmailSettingsUpdate from app.services.audit import write_audit_log @@ -96,9 +97,9 @@ async def test_email_settings( ) -> dict: settings = await _get_or_create_settings(db) if not settings.smtp_host: - raise HTTPException(status_code=400, detail="SMTP host is not configured") + raise HTTPException(status_code=400, detail=api_error("smtp_host_missing", "SMTP host is not configured")) if not settings.from_email: - raise HTTPException(status_code=400, detail="From email is not configured") + raise HTTPException(status_code=400, detail=api_error("smtp_from_email_missing", "From email is not configured")) password = decrypt_secret(settings.encrypted_smtp_password) if settings.encrypted_smtp_password else None message = EmailMessage() @@ -126,7 +127,10 @@ async def test_email_settings( smtp.login(settings.smtp_username, password or "") smtp.send_message(message) except Exception as exc: - raise HTTPException(status_code=400, detail=f"SMTP test failed: {exc}") + raise HTTPException( + status_code=400, + detail=api_error("smtp_test_failed", "SMTP test failed", {"error": str(exc)}), + ) from exc await write_audit_log(db, "admin.email_settings.test", admin.id, {"recipient": str(payload.recipient)}) return {"status": "sent", "recipient": str(payload.recipient)} diff --git a/backend/app/api/routes/admin_users.py b/backend/app/api/routes/admin_users.py index 22ff253..362b65e 100644 --- a/backend/app/api/routes/admin_users.py +++ b/backend/app/api/routes/admin_users.py @@ -3,6 +3,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import require_roles +from app.core.errors import api_error from app.core.security import hash_password from app.models.models import User from app.schemas.user import UserCreate, UserOut, UserUpdate @@ -22,7 +23,7 @@ async def list_users(admin: User = Depends(require_roles("admin")), db: AsyncSes async def create_user(payload: UserCreate, admin: User = Depends(require_roles("admin")), db: AsyncSession = Depends(get_db)) -> UserOut: exists = await db.scalar(select(User).where(User.email == payload.email)) if exists: - raise HTTPException(status_code=409, detail="Email already exists") + raise HTTPException(status_code=409, detail=api_error("email_exists", "Email already exists")) user = User( email=payload.email, first_name=payload.first_name, @@ -46,13 +47,13 @@ async def update_user( ) -> UserOut: user = await db.scalar(select(User).where(User.id == user_id)) if not user: - raise HTTPException(status_code=404, detail="User not found") + raise HTTPException(status_code=404, detail=api_error("user_not_found", "User not found")) update_data = payload.model_dump(exclude_unset=True) next_email = update_data.get("email") if next_email and next_email != user.email: existing = await db.scalar(select(User).where(User.email == next_email)) if existing and existing.id != user.id: - raise HTTPException(status_code=409, detail="Email already exists") + raise HTTPException(status_code=409, detail=api_error("email_exists", "Email already exists")) if "password" in update_data: raw_password = update_data.pop("password") if raw_password: @@ -68,10 +69,10 @@ async def update_user( @router.delete("/{user_id}") async def delete_user(user_id: int, admin: User = Depends(require_roles("admin")), db: AsyncSession = Depends(get_db)) -> dict: if user_id == admin.id: - raise HTTPException(status_code=400, detail="Cannot delete yourself") + raise HTTPException(status_code=400, detail=api_error("cannot_delete_self", "Cannot delete yourself")) user = await db.scalar(select(User).where(User.id == user_id)) if not user: - raise HTTPException(status_code=404, detail="User not found") + raise HTTPException(status_code=404, detail=api_error("user_not_found", "User not found")) await db.delete(user) await db.commit() await write_audit_log(db, "admin.user.delete", admin.id, {"deleted_user_id": user_id}) diff --git a/backend/app/api/routes/alerts.py b/backend/app/api/routes/alerts.py index bba8624..2faa592 100644 --- a/backend/app/api/routes/alerts.py +++ b/backend/app/api/routes/alerts.py @@ -4,6 +4,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import get_current_user, require_roles +from app.core.errors import api_error from app.models.models import AlertDefinition, Target, User from app.schemas.alert import ( AlertDefinitionCreate, @@ -33,7 +34,7 @@ async def _validate_target_exists(db: AsyncSession, target_id: int | None) -> No return target_exists = await db.scalar(select(Target.id).where(Target.id == target_id)) if target_exists is None: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) @router.get("/status", response_model=AlertStatusResponse) @@ -101,7 +102,7 @@ async def update_alert_definition( ) -> AlertDefinitionOut: definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) if definition is None: - raise HTTPException(status_code=404, detail="Alert definition not found") + raise HTTPException(status_code=404, detail=api_error("alert_definition_not_found", "Alert definition not found")) updates = payload.model_dump(exclude_unset=True) if "target_id" in updates: @@ -131,7 +132,7 @@ async def delete_alert_definition( ) -> dict: definition = await db.scalar(select(AlertDefinition).where(AlertDefinition.id == definition_id)) if definition is None: - raise HTTPException(status_code=404, detail="Alert definition not found") + raise HTTPException(status_code=404, detail=api_error("alert_definition_not_found", "Alert definition not found")) await db.delete(definition) await db.commit() invalidate_alert_cache() @@ -148,7 +149,7 @@ async def test_alert_definition( _ = user target = await db.scalar(select(Target).where(Target.id == payload.target_id)) if target is None: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) try: value = await run_scalar_sql_for_target(target, payload.sql_text) return AlertDefinitionTestResponse(ok=True, value=value) diff --git a/backend/app/api/routes/auth.py b/backend/app/api/routes/auth.py index 32d3d98..ed97cea 100644 --- a/backend/app/api/routes/auth.py +++ b/backend/app/api/routes/auth.py @@ -5,6 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings from app.core.db import get_db from app.core.deps import get_current_user +from app.core.errors import api_error from app.core.security import create_access_token, create_refresh_token, verify_password from app.models.models import User from app.schemas.auth import LoginRequest, RefreshRequest, TokenResponse @@ -19,7 +20,10 @@ settings = get_settings() async def login(payload: LoginRequest, db: AsyncSession = Depends(get_db)) -> TokenResponse: user = await db.scalar(select(User).where(User.email == payload.email)) if not user or not verify_password(payload.password, user.password_hash): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("invalid_credentials", "Invalid credentials"), + ) await write_audit_log(db, action="auth.login", user_id=user.id, payload={"email": user.email}) return TokenResponse(access_token=create_access_token(str(user.id)), refresh_token=create_refresh_token(str(user.id))) @@ -30,14 +34,23 @@ async def refresh(payload: RefreshRequest, db: AsyncSession = Depends(get_db)) - try: token_payload = jwt.decode(payload.refresh_token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) except jwt.InvalidTokenError as exc: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token") from exc + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("invalid_refresh_token", "Invalid refresh token"), + ) from exc if token_payload.get("type") != "refresh": - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token type") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("invalid_refresh_token_type", "Invalid refresh token type"), + ) user_id = token_payload.get("sub") user = await db.scalar(select(User).where(User.id == int(user_id))) if not user: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("user_not_found", "User not found"), + ) await write_audit_log(db, action="auth.refresh", user_id=user.id, payload={}) return TokenResponse(access_token=create_access_token(str(user.id)), refresh_token=create_refresh_token(str(user.id))) diff --git a/backend/app/api/routes/me.py b/backend/app/api/routes/me.py index 0b43821..14cd815 100644 --- a/backend/app/api/routes/me.py +++ b/backend/app/api/routes/me.py @@ -2,6 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import get_current_user +from app.core.errors import api_error from app.core.security import hash_password, verify_password from app.models.models import User from app.schemas.user import UserOut, UserPasswordChange @@ -22,10 +23,16 @@ async def change_password( db: AsyncSession = Depends(get_db), ) -> dict: if not verify_password(payload.current_password, user.password_hash): - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=api_error("invalid_current_password", "Current password is incorrect"), + ) if verify_password(payload.new_password, user.password_hash): - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="New password must be different") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=api_error("password_reuse_not_allowed", "New password must be different"), + ) user.password_hash = hash_password(payload.new_password) await db.commit() diff --git a/backend/app/api/routes/targets.py b/backend/app/api/routes/targets.py index 6ab0322..ea437a7 100644 --- a/backend/app/api/routes/targets.py +++ b/backend/app/api/routes/targets.py @@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.db import get_db from app.core.deps import get_current_user, require_roles +from app.core.errors import api_error from app.models.models import Metric, QueryStat, Target, TargetOwner, User from app.schemas.metric import MetricOut, QueryStatOut from app.schemas.overview import DatabaseOverviewOut @@ -85,7 +86,10 @@ async def _discover_databases(payload: TargetCreate) -> list[str]: ) return [row["datname"] for row in rows if row["datname"]] except Exception as exc: - raise HTTPException(status_code=400, detail=f"Database discovery failed: {exc}") + raise HTTPException( + status_code=400, + detail=api_error("database_discovery_failed", "Database discovery failed", {"error": str(exc)}), + ) finally: if conn: await conn.close() @@ -131,7 +135,10 @@ async def test_target_connection( version = await conn.fetchval("SHOW server_version") return {"ok": True, "message": "Connection successful", "server_version": version} except Exception as exc: - raise HTTPException(status_code=400, detail=f"Connection failed: {exc}") + raise HTTPException( + status_code=400, + detail=api_error("connection_test_failed", "Connection test failed", {"error": str(exc)}), + ) finally: if conn: await conn.close() @@ -147,7 +154,10 @@ async def create_target( if owner_ids: owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_ids)))).all() if len(set(owners_exist)) != len(owner_ids): - raise HTTPException(status_code=400, detail="One or more owner users were not found") + raise HTTPException( + status_code=400, + detail=api_error("owner_users_not_found", "One or more owner users were not found"), + ) encrypted_password = encrypt_secret(payload.password) created_targets: list[Target] = [] @@ -155,7 +165,10 @@ async def create_target( if payload.discover_all_databases: databases = await _discover_databases(payload) if not databases: - raise HTTPException(status_code=400, detail="No databases discovered on target") + raise HTTPException( + status_code=400, + detail=api_error("no_databases_discovered", "No databases discovered on target"), + ) group_id = str(uuid4()) base_tags = payload.tags or {} for dbname in databases: @@ -194,7 +207,10 @@ async def create_target( await _set_target_owners(db, target.id, owner_ids, user.id) if not created_targets: - raise HTTPException(status_code=400, detail="All discovered databases already exist as targets") + raise HTTPException( + status_code=400, + detail=api_error("all_discovered_databases_exist", "All discovered databases already exist as targets"), + ) await db.commit() for item in created_targets: await db.refresh(item) @@ -247,7 +263,7 @@ async def get_target(target_id: int, user: User = Depends(get_current_user), db: _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) owner_map = await _owners_by_target_ids(db, [target.id]) return _target_out_with_owners(target, owner_map.get(target.id, [])) @@ -261,7 +277,7 @@ async def update_target( ) -> TargetOut: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) updates = payload.model_dump(exclude_unset=True) owner_user_ids = updates.pop("owner_user_ids", None) @@ -273,7 +289,10 @@ async def update_target( if owner_user_ids is not None: owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all() if len(set(owners_exist)) != len(set(owner_user_ids)): - raise HTTPException(status_code=400, detail="One or more owner users were not found") + raise HTTPException( + status_code=400, + detail=api_error("owner_users_not_found", "One or more owner users were not found"), + ) await _set_target_owners(db, target.id, owner_user_ids, user.id) await db.commit() @@ -292,12 +311,15 @@ async def set_target_owners( ) -> list[TargetOwnerOut]: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) owner_user_ids = sorted(set(payload.user_ids)) if owner_user_ids: owners_exist = (await db.scalars(select(User.id).where(User.id.in_(owner_user_ids)))).all() if len(set(owners_exist)) != len(set(owner_user_ids)): - raise HTTPException(status_code=400, detail="One or more owner users were not found") + raise HTTPException( + status_code=400, + detail=api_error("owner_users_not_found", "One or more owner users were not found"), + ) await _set_target_owners(db, target_id, owner_user_ids, user.id) await db.commit() await write_audit_log(db, "target.owners.update", user.id, {"target_id": target_id, "owner_user_ids": owner_user_ids}) @@ -321,7 +343,7 @@ async def get_target_owners( _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) rows = ( await db.execute( select(User.id, User.email, User.role) @@ -341,7 +363,7 @@ async def delete_target( ) -> dict: target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) await db.delete(target) await db.commit() await write_audit_log(db, "target.delete", user.id, {"target_id": target_id}) @@ -369,7 +391,22 @@ async def get_metrics( async def _live_conn(target: Target) -> asyncpg.Connection: - return await asyncpg.connect(dsn=build_target_dsn(target)) + try: + return await asyncpg.connect(dsn=build_target_dsn(target)) + except (OSError, asyncpg.PostgresError) as exc: + raise HTTPException( + status_code=503, + detail=api_error( + "target_unreachable", + "Target database is not reachable", + { + "target_id": target.id, + "host": target.host, + "port": target.port, + "error": str(exc), + }, + ), + ) from exc @router.get("/{target_id}/locks") @@ -377,7 +414,7 @@ async def get_locks(target_id: int, user: User = Depends(get_current_user), db: _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) conn = await _live_conn(target) try: rows = await conn.fetch( @@ -398,7 +435,7 @@ async def get_activity(target_id: int, user: User = Depends(get_current_user), d _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) conn = await _live_conn(target) try: rows = await conn.fetch( @@ -420,7 +457,7 @@ async def get_top_queries(target_id: int, user: User = Depends(get_current_user) _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) if not target.use_pg_stat_statements: return [] rows = ( @@ -450,5 +487,20 @@ async def get_overview(target_id: int, user: User = Depends(get_current_user), d _ = user target = await db.scalar(select(Target).where(Target.id == target_id)) if not target: - raise HTTPException(status_code=404, detail="Target not found") - return await get_target_overview(target) + raise HTTPException(status_code=404, detail=api_error("target_not_found", "Target not found")) + try: + return await get_target_overview(target) + except (OSError, asyncpg.PostgresError) as exc: + raise HTTPException( + status_code=503, + detail=api_error( + "target_unreachable", + "Target database is not reachable", + { + "target_id": target.id, + "host": target.host, + "port": target.port, + "error": str(exc), + }, + ), + ) from exc diff --git a/backend/app/core/config.py b/backend/app/core/config.py index e54f34a..899dba8 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -2,7 +2,7 @@ from functools import lru_cache from pydantic import field_validator from pydantic_settings import BaseSettings, SettingsConfigDict -NEXAPG_VERSION = "0.1.8" +NEXAPG_VERSION = "0.2.0" class Settings(BaseSettings): diff --git a/backend/app/core/deps.py b/backend/app/core/deps.py index 0260c1b..b91a92a 100644 --- a/backend/app/core/deps.py +++ b/backend/app/core/deps.py @@ -5,6 +5,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings from app.core.db import get_db +from app.core.errors import api_error from app.models.models import User settings = get_settings() @@ -16,27 +17,42 @@ async def get_current_user( db: AsyncSession = Depends(get_db), ) -> User: if not credentials: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing token") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("missing_token", "Missing token"), + ) token = credentials.credentials try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) except jwt.InvalidTokenError as exc: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("invalid_token", "Invalid token"), + ) from exc if payload.get("type") != "access": - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("invalid_token_type", "Invalid token type"), + ) user_id = payload.get("sub") user = await db.scalar(select(User).where(User.id == int(user_id))) if not user: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=api_error("user_not_found", "User not found"), + ) return user def require_roles(*roles: str): async def role_dependency(user: User = Depends(get_current_user)) -> User: if user.role not in roles: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=api_error("forbidden", "Forbidden"), + ) return user return role_dependency diff --git a/backend/app/core/errors.py b/backend/app/core/errors.py new file mode 100644 index 0000000..60c2bbc --- /dev/null +++ b/backend/app/core/errors.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import Any + + +def error_payload(code: str, message: str, details: Any, request_id: str) -> dict[str, Any]: + return { + "code": code, + "message": message, + "details": details, + "request_id": request_id, + } + + +def api_error(code: str, message: str, details: Any = None) -> dict[str, Any]: + return { + "code": code, + "message": message, + "details": details, + } + + +def http_status_to_code(status_code: int) -> str: + mapping = { + 400: "bad_request", + 401: "unauthorized", + 403: "forbidden", + 404: "not_found", + 405: "method_not_allowed", + 409: "conflict", + 422: "validation_error", + 429: "rate_limited", + 500: "internal_error", + 502: "bad_gateway", + 503: "service_unavailable", + 504: "gateway_timeout", + } + return mapping.get(status_code, f"http_{status_code}") diff --git a/backend/app/main.py b/backend/app/main.py index d1420ed..a70b02c 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,12 +1,17 @@ import asyncio import logging +from uuid import uuid4 from contextlib import asynccontextmanager -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException, Request +from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from starlette.exceptions import HTTPException as StarletteHTTPException from sqlalchemy import select from app.api.router import api_router from app.core.config import get_settings from app.core.db import SessionLocal +from app.core.errors import error_payload, http_status_to_code from app.core.logging import configure_logging from app.core.security import hash_password from app.models.models import User @@ -57,4 +62,67 @@ app.add_middleware( allow_methods=["*"], allow_headers=["*"], ) + + +@app.middleware("http") +async def request_id_middleware(request: Request, call_next): + request_id = request.headers.get("x-request-id") or str(uuid4()) + request.state.request_id = request_id + response = await call_next(request) + response.headers["X-Request-ID"] = request_id + return response + + +@app.exception_handler(HTTPException) +@app.exception_handler(StarletteHTTPException) +async def http_exception_handler(request: Request, exc: HTTPException | StarletteHTTPException): + request_id = getattr(request.state, "request_id", str(uuid4())) + code = http_status_to_code(exc.status_code) + message = "Request failed" + details = None + + if isinstance(exc.detail, str): + message = exc.detail + elif isinstance(exc.detail, dict): + code = str(exc.detail.get("code", code)) + message = str(exc.detail.get("message", message)) + details = exc.detail.get("details") + elif isinstance(exc.detail, list): + message = "Request validation failed" + details = exc.detail + + return JSONResponse( + status_code=exc.status_code, + content=error_payload(code=code, message=message, details=details, request_id=request_id), + ) + + +@app.exception_handler(RequestValidationError) +async def request_validation_exception_handler(request: Request, exc: RequestValidationError): + request_id = getattr(request.state, "request_id", str(uuid4())) + return JSONResponse( + status_code=422, + content=error_payload( + code="validation_error", + message="Request validation failed", + details=exc.errors(), + request_id=request_id, + ), + ) + + +@app.exception_handler(Exception) +async def unhandled_exception_handler(request: Request, exc: Exception): + request_id = getattr(request.state, "request_id", str(uuid4())) + logger.exception("unhandled_exception request_id=%s", request_id, exc_info=exc) + return JSONResponse( + status_code=500, + content=error_payload( + code="internal_error", + message="Internal server error", + details=None, + request_id=request_id, + ), + ) + app.include_router(api_router, prefix=settings.api_v1_prefix) diff --git a/backend/app/services/alerts.py b/backend/app/services/alerts.py index 199205d..0180961 100644 --- a/backend/app/services/alerts.py +++ b/backend/app/services/alerts.py @@ -11,6 +11,7 @@ from sqlalchemy import desc, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings +from app.core.errors import api_error from app.models.models import AlertDefinition, Metric, QueryStat, Target from app.schemas.alert import AlertStatusItem, AlertStatusResponse from app.services.collector import build_target_dsn @@ -144,25 +145,40 @@ def get_standard_alert_reference() -> list[dict[str, str]]: def validate_alert_thresholds(comparison: str, warning_threshold: float | None, alert_threshold: float) -> None: if comparison not in _ALLOWED_COMPARISONS: - raise HTTPException(status_code=400, detail=f"Invalid comparison. Use one of {sorted(_ALLOWED_COMPARISONS)}") + raise HTTPException( + status_code=400, + detail=api_error( + "invalid_comparison", + f"Invalid comparison. Use one of {sorted(_ALLOWED_COMPARISONS)}", + ), + ) if warning_threshold is None: return if comparison in {"gte", "gt"} and warning_threshold > alert_threshold: - raise HTTPException(status_code=400, detail="For gte/gt, warning_threshold must be <= alert_threshold") + raise HTTPException( + status_code=400, + detail=api_error("invalid_thresholds", "For gte/gt, warning_threshold must be <= alert_threshold"), + ) if comparison in {"lte", "lt"} and warning_threshold < alert_threshold: - raise HTTPException(status_code=400, detail="For lte/lt, warning_threshold must be >= alert_threshold") + raise HTTPException( + status_code=400, + detail=api_error("invalid_thresholds", "For lte/lt, warning_threshold must be >= alert_threshold"), + ) def validate_alert_sql(sql_text: str) -> str: sql = sql_text.strip().rstrip(";") lowered = sql.lower().strip() if not lowered.startswith("select"): - raise HTTPException(status_code=400, detail="Alert SQL must start with SELECT") + raise HTTPException(status_code=400, detail=api_error("invalid_alert_sql", "Alert SQL must start with SELECT")) if _FORBIDDEN_SQL_WORDS.search(lowered): - raise HTTPException(status_code=400, detail="Only read-only SELECT statements are allowed") + raise HTTPException( + status_code=400, + detail=api_error("invalid_alert_sql", "Only read-only SELECT statements are allowed"), + ) if ";" in sql: - raise HTTPException(status_code=400, detail="Only a single SQL statement is allowed") + raise HTTPException(status_code=400, detail=api_error("invalid_alert_sql", "Only a single SQL statement is allowed")) return sql diff --git a/backend/app/services/collector.py b/backend/app/services/collector.py index 51bbe3c..60cd4d2 100644 --- a/backend/app/services/collector.py +++ b/backend/app/services/collector.py @@ -1,6 +1,7 @@ import asyncio import logging from datetime import datetime, timezone +from random import uniform from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError @@ -15,6 +16,9 @@ logger = logging.getLogger(__name__) settings = get_settings() _failure_state: dict[int, dict[str, object]] = {} _failure_log_interval_seconds = 300 +_backoff_base_seconds = max(3, int(settings.poll_interval_seconds)) +_backoff_max_seconds = 300 +_backoff_jitter_factor = 0.15 def build_target_dsn(target: Target) -> str: @@ -181,31 +185,66 @@ async def collect_once() -> None: async with SessionLocal() as db: targets = (await db.scalars(select(Target))).all() + active_target_ids = {target.id for target in targets} + stale_target_ids = [target_id for target_id in _failure_state.keys() if target_id not in active_target_ids] + for stale_target_id in stale_target_ids: + _failure_state.pop(stale_target_id, None) + for target in targets: + now = datetime.now(timezone.utc) + state = _failure_state.get(target.id) + if state: + next_attempt_at = state.get("next_attempt_at") + if isinstance(next_attempt_at, datetime) and now < next_attempt_at: + continue + try: await collect_target(target) prev = _failure_state.pop(target.id, None) if prev: + first_failure_at = prev.get("first_failure_at") + downtime_seconds = None + if isinstance(first_failure_at, datetime): + downtime_seconds = max(0, int((now - first_failure_at).total_seconds())) logger.info( - "collector_target_recovered target=%s after_failures=%s last_error=%s", + "collector_target_recovered target=%s after_failures=%s downtime_seconds=%s last_error=%s", target.id, prev.get("count", 0), + downtime_seconds, prev.get("error"), ) except (OSError, SQLAlchemyError, asyncpg.PostgresError, Exception) as exc: - now = datetime.now(timezone.utc) current_error = str(exc) + error_class = exc.__class__.__name__ state = _failure_state.get(target.id) if state is None: + next_delay = min(_backoff_max_seconds, _backoff_base_seconds) + jitter = next_delay * _backoff_jitter_factor + next_delay = max(1, int(next_delay + uniform(-jitter, jitter))) + next_attempt_at = now.timestamp() + next_delay _failure_state[target.id] = { "count": 1, + "first_failure_at": now, "last_log_at": now, "error": current_error, + "next_attempt_at": datetime.fromtimestamp(next_attempt_at, tz=timezone.utc), } - logger.exception("collector_error target=%s err=%s", target.id, exc) + logger.warning( + "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s retry_in_seconds=%s", + target.id, + error_class, + current_error, + 1, + next_delay, + ) continue count = int(state.get("count", 0)) + 1 + raw_backoff = min(_backoff_max_seconds, _backoff_base_seconds * (2 ** min(count - 1, 10))) + jitter = raw_backoff * _backoff_jitter_factor + next_delay = max(1, int(raw_backoff + uniform(-jitter, jitter))) + state["next_attempt_at"] = datetime.fromtimestamp(now.timestamp() + next_delay, tz=timezone.utc) + last_log_at = state.get("last_log_at") last_logged_error = str(state.get("error", "")) should_log = False @@ -220,18 +259,23 @@ async def collect_once() -> None: if should_log: state["last_log_at"] = now state["error"] = current_error - logger.error( - "collector_error_throttled target=%s err=%s consecutive_failures=%s", + logger.warning( + "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s retry_in_seconds=%s", target.id, + error_class, current_error, count, + next_delay, ) async def collector_loop(stop_event: asyncio.Event) -> None: while not stop_event.is_set(): + cycle_started = asyncio.get_running_loop().time() await collect_once() + elapsed = asyncio.get_running_loop().time() - cycle_started + sleep_for = max(0.0, settings.poll_interval_seconds - elapsed) try: - await asyncio.wait_for(stop_event.wait(), timeout=settings.poll_interval_seconds) + await asyncio.wait_for(stop_event.wait(), timeout=sleep_for) except asyncio.TimeoutError: pass diff --git a/frontend/src/api.js b/frontend/src/api.js index 5ee9bfb..6a1ca4d 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -35,8 +35,21 @@ export async function apiFetch(path, options = {}, tokens, onUnauthorized) { } } if (!res.ok) { - const txt = await res.text(); - throw new Error(txt || `HTTP ${res.status}`); + const raw = await res.text(); + let parsed = null; + try { + parsed = raw ? JSON.parse(raw) : null; + } catch { + parsed = null; + } + + const message = parsed?.message || raw || `HTTP ${res.status}`; + const err = new Error(message); + err.status = res.status; + err.code = parsed?.code || null; + err.details = parsed?.details || null; + err.requestId = parsed?.request_id || res.headers.get("x-request-id") || null; + throw err; } if (res.status === 204) return null; return res.json(); diff --git a/frontend/src/pages/TargetDetailPage.jsx b/frontend/src/pages/TargetDetailPage.jsx index 21c26e1..8337190 100644 --- a/frontend/src/pages/TargetDetailPage.jsx +++ b/frontend/src/pages/TargetDetailPage.jsx @@ -76,6 +76,10 @@ function didMetricSeriesChange(prev = [], next = []) { return prevLast?.ts !== nextLast?.ts || Number(prevLast?.value) !== Number(nextLast?.value); } +function isTargetUnreachableError(err) { + return err?.code === "target_unreachable" || err?.status === 503; +} + async function loadMetric(targetId, metric, range, tokens, refresh) { const { from, to } = toQueryRange(range); return apiFetch( @@ -99,6 +103,7 @@ export function TargetDetailPage() { const [targetMeta, setTargetMeta] = useState(null); const [owners, setOwners] = useState([]); const [groupTargets, setGroupTargets] = useState([]); + const [offlineState, setOfflineState] = useState(null); const [error, setError] = useState(""); const [loading, setLoading] = useState(true); const refreshRef = useRef(refresh); @@ -114,22 +119,16 @@ export function TargetDetailPage() { setLoading(true); } try { - const [connections, xacts, cache, locksTable, activityTable, overviewData, targetInfo, ownerRows, allTargets] = await Promise.all([ + const [connections, xacts, cache, targetInfo, ownerRows, allTargets] = await Promise.all([ loadMetric(id, "connections_total", range, tokens, refreshRef.current), loadMetric(id, "xacts_total", range, tokens, refreshRef.current), loadMetric(id, "cache_hit_ratio", range, tokens, refreshRef.current), - apiFetch(`/targets/${id}/locks`, {}, tokens, refreshRef.current), - apiFetch(`/targets/${id}/activity`, {}, tokens, refreshRef.current), - apiFetch(`/targets/${id}/overview`, {}, tokens, refreshRef.current), apiFetch(`/targets/${id}`, {}, tokens, refreshRef.current), apiFetch(`/targets/${id}/owners`, {}, tokens, refreshRef.current), apiFetch("/targets", {}, tokens, refreshRef.current), ]); if (!active) return; setSeries({ connections, xacts, cache }); - setLocks(locksTable); - setActivity(activityTable); - setOverview(overviewData); setTargetMeta(targetInfo); setOwners(ownerRows); const groupId = targetInfo?.tags?.monitor_group_id; @@ -141,6 +140,34 @@ export function TargetDetailPage() { } else { setGroupTargets([]); } + try { + const [locksTable, activityTable, overviewData] = await Promise.all([ + apiFetch(`/targets/${id}/locks`, {}, tokens, refreshRef.current), + apiFetch(`/targets/${id}/activity`, {}, tokens, refreshRef.current), + apiFetch(`/targets/${id}/overview`, {}, tokens, refreshRef.current), + ]); + if (!active) return; + setLocks(locksTable); + setActivity(activityTable); + setOverview(overviewData); + setOfflineState(null); + } catch (liveErr) { + if (!active) return; + if (isTargetUnreachableError(liveErr)) { + setLocks([]); + setActivity([]); + setOverview(null); + setOfflineState({ + message: + "Target is currently unreachable. Check host/port, network route, SSL mode, and database availability.", + host: liveErr?.details?.host || targetInfo?.host || "-", + port: liveErr?.details?.port || targetInfo?.port || "-", + requestId: liveErr?.requestId || null, + }); + } else { + throw liveErr; + } + } setError(""); } catch (e) { if (active) setError(String(e.message || e)); @@ -281,6 +308,17 @@ export function TargetDetailPage() { Responsible users: {owners.length > 0 ? owners.map((item) => {item.email}) : none assigned} + {offlineState && ( +
+

Target Offline

+

{offlineState.message}

+
+ Host: {offlineState.host} + Port: {offlineState.port} + {offlineState.requestId ? Request ID: {offlineState.requestId} : null} +
+
+ )} {uiMode === "easy" && overview && easySummary && ( <>
diff --git a/frontend/src/styles.css b/frontend/src/styles.css index 25d10d0..9c1b75f 100644 --- a/frontend/src/styles.css +++ b/frontend/src/styles.css @@ -2022,6 +2022,29 @@ select:-webkit-autofill { font-size: 12px; } +.target-offline-card { + border-color: #a85757; + background: linear-gradient(130deg, #2c1724 0%, #1f1f38 100%); +} + +.target-offline-card h3 { + margin: 0 0 8px; + color: #fecaca; +} + +.target-offline-card p { + margin: 0 0 10px; + color: #fde2e2; +} + +.target-offline-meta { + display: flex; + flex-wrap: wrap; + gap: 16px; + font-size: 12px; + color: #d4d4f5; +} + .chart-tooltip { background: #0f1934ee; border: 1px solid #2f4a8b;