Files
NexaPG/backend/app/services/service_info.py
nessi 0445a72764
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 8s
Add service information feature with version checks
This commit introduces a new "Service Information" section displaying runtime details, installed version, and update status for the NexaPG application. It includes backend API endpoints, database schema changes, and a corresponding frontend page that allows users to check for updates against the official repository. The `.env` example now includes an `APP_VERSION` variable, and related documentation has been updated.
2026-02-13 08:54:13 +01:00

112 lines
3.7 KiB
Python

import asyncio
import json
import re
from datetime import datetime, timezone
from urllib.error import URLError
from urllib.request import Request, urlopen
UPSTREAM_REPO_WEB = "https://git.nesterovic.cc/nessi/NexaPG"
UPSTREAM_REPO_API = "https://git.nesterovic.cc/api/v1/repos/nessi/NexaPG"
def _extract_version(payload: str) -> str | None:
txt = payload.strip()
if not txt:
return None
try:
data = json.loads(txt)
if isinstance(data, dict):
for key in ("latest_version", "version", "tag_name", "name"):
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if isinstance(data, list) and data and isinstance(data[0], dict):
for key in ("latest_version", "version", "tag_name", "name"):
value = data[0].get(key)
if isinstance(value, str) and value.strip():
return value.strip()
except json.JSONDecodeError:
pass
first_line = txt.splitlines()[0].strip()
if first_line:
return first_line[:64]
return None
def _parse_semver(value: str) -> tuple[int, ...] | None:
normalized = value.strip().lower()
if normalized.startswith("v"):
normalized = normalized[1:]
parts = re.findall(r"\d+", normalized)
if not parts:
return None
return tuple(int(p) for p in parts[:4])
def is_update_available(current_version: str, latest_version: str) -> bool:
current = _parse_semver(current_version)
latest = _parse_semver(latest_version)
if current and latest:
max_len = max(len(current), len(latest))
current = current + (0,) * (max_len - len(current))
latest = latest + (0,) * (max_len - len(latest))
return latest > current
return latest_version.strip() != current_version.strip()
def _get_json(url: str):
req = Request(url, headers={"User-Agent": "NexaPG/1.0"})
with urlopen(req, timeout=8) as response:
raw = response.read(64_000).decode("utf-8", errors="replace")
return json.loads(raw)
def _fetch_latest_from_upstream_sync() -> tuple[str, str]:
latest_release_url = f"{UPSTREAM_REPO_API}/releases/latest"
tags_url = f"{UPSTREAM_REPO_API}/tags?page=1&limit=1"
commits_url = f"{UPSTREAM_REPO_API}/commits?sha=main&page=1&limit=1"
try:
release = _get_json(latest_release_url)
if isinstance(release, dict):
tag = (release.get("tag_name") or release.get("name") or "").strip()
if tag:
return tag[:64], "release"
except Exception:
pass
try:
tags = _get_json(tags_url)
if isinstance(tags, list) and tags:
first = tags[0] if isinstance(tags[0], dict) else {}
tag = (first.get("name") or "").strip()
if tag:
return tag[:64], "tag"
except Exception:
pass
commits = _get_json(commits_url)
if isinstance(commits, list) and commits:
first = commits[0] if isinstance(commits[0], dict) else {}
sha = (first.get("sha") or "").strip()
if sha:
short = sha[:7]
return f"commit-{short}", "commit"
raise ValueError("Could not fetch release/tag/commit from upstream repository")
async def fetch_latest_from_upstream() -> tuple[str | None, str | None, str | None]:
try:
latest, ref = await asyncio.to_thread(_fetch_latest_from_upstream_sync)
return latest, ref, None
except URLError as exc:
return None, None, f"Version check failed: {exc.reason}"
except Exception as exc: # noqa: BLE001
return None, None, f"Version check failed: {exc}"
def utcnow() -> datetime:
return datetime.now(timezone.utc)