from datetime import UTC, datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Response, status from sqlalchemy import select from sqlalchemy.orm import Session from app.api.deps import current_user from app.core.config import get_settings from app.core.security import ( create_session_token, hash_password, hash_token, make_csrf_token, new_token, verify_password, ) from app.db.session import get_db from app.models.entities import ( Home, HomeMembership, HomeRole, InvitationToken, PasswordResetToken, User, ) from app.schemas.common import InviteAccept, LoginRequest, Message, UserOut from app.services.audit import audit from app.services.mail import invite_body, send_mail router = APIRouter() def set_auth_cookies(response: Response, user_id: str) -> None: settings = get_settings() csrf = make_csrf_token() response.set_cookie("np_session", create_session_token(user_id), httponly=True, secure=settings.cookie_secure, samesite="lax", max_age=60 * 60 * 24 * 14) response.set_cookie("np_csrf", csrf, httponly=False, secure=settings.cookie_secure, samesite="lax", max_age=60 * 60 * 24 * 14) @router.post("/login", response_model=UserOut) def login(payload: LoginRequest, response: Response, db: Session = Depends(get_db)) -> User: user = db.scalar(select(User).where(User.email == str(payload.email).lower())) if not user or not verify_password(payload.password, user.password_hash) or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") set_auth_cookies(response, user.id) audit(db, user, "auth.login", "user", user.id) db.commit() return user @router.post("/logout", response_model=Message) def logout(response: Response) -> Message: response.delete_cookie("np_session") response.delete_cookie("np_csrf") return Message(message="Logged out") @router.get("/me", response_model=UserOut) def me(user: User = Depends(current_user)) -> User: return user @router.patch("/me", response_model=UserOut) def update_me(payload: dict, user: User = Depends(current_user), db: Session = Depends(get_db)) -> User: for field in ["name", "language", "theme", "timezone", "onboarding_completed"]: if field in payload: setattr(user, field, payload[field]) db.commit() db.refresh(user) return user @router.post("/accept-invite", response_model=UserOut) def accept_invite(payload: InviteAccept, response: Response, db: Session = Depends(get_db)) -> User: hashed = hash_token(payload.token) invite = db.scalar(select(InvitationToken).where(InvitationToken.token_hash == hashed)) if not invite or invite.consumed_at or invite.expires_at < datetime.now(UTC): raise HTTPException(status_code=400, detail="Invalid or expired invitation") user = db.get(User, invite.user_id) if not user: raise HTTPException(status_code=400, detail="Invalid invitation") user.name = payload.name user.password_hash = hash_password(payload.password) user.language = payload.language user.theme = payload.theme invite.consumed_at = datetime.now(UTC) if payload.home_name: home = Home(name=payload.home_name) db.add(home) db.flush() db.add(HomeMembership(home_id=home.id, user_id=user.id, role=HomeRole.OWNER)) elif payload.join_code: from app.api.routes.homes import join_home_by_code join_home_by_code(payload.join_code, user, db) audit(db, user, "auth.accept_invite", "user", user.id) db.commit() set_auth_cookies(response, user.id) return user def create_invitation(db: Session, user: User) -> str: token = new_token() db.add(InvitationToken(user_id=user.id, token_hash=hash_token(token), expires_at=datetime.now(UTC) + timedelta(days=7))) return token def create_reset_token(db: Session, user: User) -> str: token = new_token() db.add(PasswordResetToken(user_id=user.id, token_hash=hash_token(token), expires_at=datetime.now(UTC) + timedelta(hours=2))) return token def send_invitation(db: Session, user: User) -> None: token = create_invitation(db, user) send_mail(db, user.email, "Your NexaPantry invitation", invite_body(token))