Files
NexaPG/backend/app/core/deps.py
2026-02-12 09:09:13 +01:00

43 lines
1.6 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.core.db import get_db
from app.models.models import User
settings = get_settings()
bearer = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer),
db: AsyncSession = Depends(get_db),
) -> User:
if not credentials:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing token")
token = credentials.credentials
try:
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
if payload.get("type") != "access":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
user_id = payload.get("sub")
user = await db.scalar(select(User).where(User.id == int(user_id)))
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
def require_roles(*roles: str):
async def role_dependency(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
return user
return role_dependency