Switched the dependency from `python-jose` to `PyJWT` to handle JWT encoding and decoding. Updated related code to use `PyJWT`'s `InvalidTokenError` instead of `JWTError`. Also bumped the application version from `0.1.7` to `0.1.8`.
43 lines
1.6 KiB
Python
43 lines
1.6 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
import jwt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.core.config import get_settings
|
|
from app.core.db import get_db
|
|
from app.models.models import User
|
|
|
|
settings = get_settings()
|
|
bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(bearer),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> User:
|
|
if not credentials:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing token")
|
|
token = credentials.credentials
|
|
try:
|
|
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
|
|
except jwt.InvalidTokenError as exc:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
|
|
|
|
if payload.get("type") != "access":
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
|
|
|
|
user_id = payload.get("sub")
|
|
user = await db.scalar(select(User).where(User.id == int(user_id)))
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
|
return user
|
|
|
|
|
|
def require_roles(*roles: str):
|
|
async def role_dependency(user: User = Depends(get_current_user)) -> User:
|
|
if user.role not in roles:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
|
|
return user
|
|
|
|
return role_dependency
|