Files
NexaPantry/backend/app/services/notifications.py
nessi 3792ca55e7
Some checks failed
CI / backend (push) Failing after 31s
CI / frontend (push) Successful in 40s
CI / docker (push) Has been skipped
chore: initial project setup with backend, frontend, and infrastructure
Add complete NexaPantry application structure including:
- Docker Compose configuration with PostgreSQL, Redis, FastAPI backend, worker, frontend and Caddy
- Environment configuration template with database, auth, and service settings
- GitHub Actions CI workflow for backend/frontend linting, testing, auditing and Docker builds
- AGPL-3.0 license and comprehensive README with setup, development, and security documentation
- Backend
2026-06-04 10:26:38 +02:00

51 lines
1.9 KiB
Python

import logging
from datetime import UTC, date, datetime, timedelta
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.entities import Home, HomeMembership, Notification, Product
from app.services.mail import send_mail
logger = logging.getLogger(__name__)
def create_expiry_notifications(db: Session) -> int:
count = 0
homes = db.scalars(select(Home)).all()
today = date.today()
for home in homes:
deadline = today + timedelta(days=home.expiry_warning_days)
products = db.scalars(
select(Product).where(Product.home_id == home.id, Product.expires_at <= deadline)
).all()
if not products:
continue
memberships = db.scalars(select(HomeMembership).where(HomeMembership.home_id == home.id)).all()
for membership in memberships:
prefs = membership.notification_preferences or {}
if prefs.get("in_app", True):
db.add(
Notification(
user_id=membership.user_id,
home_id=home.id,
title="NexaPantry expiry warning",
body=f"{len(products)} products expire soon in {home.name}.",
kind="expiry",
)
)
count += 1
if prefs.get("email", False):
try:
send_mail(db, membership.user.email, "NexaPantry expiry warning", f"{len(products)} products expire soon in {home.name}.")
except Exception:
logger.exception("Expiry e-mail delivery failed for user %s", membership.user_id)
continue
db.commit()
return count
def mark_read(db: Session, notification: Notification) -> Notification:
notification.read_at = datetime.now(UTC)
return notification