Files
NexaPG/backend/app/services/service_info.py
nessi e24681332d
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 10s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 8s
Simplify upstream version check mechanism
Updated the "Check for Updates" functionality to rely solely on the latest published release in the upstream repository. Removed redundant code for fetching tags and commits, enhancing maintainability and reducing complexity.
2026-02-13 09:01:42 +01:00

92 lines
3.1 KiB
Python

import asyncio
import json
import re
from datetime import datetime, timezone
from urllib.error import URLError
from urllib.request import Request, urlopen
UPSTREAM_REPO_WEB = "https://git.nesterovic.cc/nessi/NexaPG"
UPSTREAM_REPO_API = "https://git.nesterovic.cc/api/v1/repos/nessi/NexaPG"
def _extract_version(payload: str) -> str | None:
txt = payload.strip()
if not txt:
return None
try:
data = json.loads(txt)
if isinstance(data, dict):
for key in ("latest_version", "version", "tag_name", "name"):
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if isinstance(data, list) and data and isinstance(data[0], dict):
for key in ("latest_version", "version", "tag_name", "name"):
value = data[0].get(key)
if isinstance(value, str) and value.strip():
return value.strip()
except json.JSONDecodeError:
pass
first_line = txt.splitlines()[0].strip()
if first_line:
return first_line[:64]
return None
def _parse_semver(value: str) -> tuple[int, ...] | None:
normalized = value.strip().lower()
if normalized.startswith("v"):
normalized = normalized[1:]
parts = re.findall(r"\d+", normalized)
if not parts:
return None
return tuple(int(p) for p in parts[:4])
def is_update_available(current_version: str, latest_version: str) -> bool:
current = _parse_semver(current_version)
latest = _parse_semver(latest_version)
if current and latest:
max_len = max(len(current), len(latest))
current = current + (0,) * (max_len - len(current))
latest = latest + (0,) * (max_len - len(latest))
return latest > current
return latest_version.strip() != current_version.strip()
def _get_json(url: str):
req = Request(url, headers={"User-Agent": "NexaPG/1.0"})
with urlopen(req, timeout=8) as response:
raw = response.read(64_000).decode("utf-8", errors="replace")
return json.loads(raw)
def _fetch_latest_from_upstream_sync() -> tuple[str, str]:
latest_release_url = f"{UPSTREAM_REPO_API}/releases/latest"
try:
release = _get_json(latest_release_url)
if isinstance(release, dict):
tag = (release.get("tag_name") or release.get("name") or "").strip()
if tag:
return tag[:64], "release"
except Exception as exc:
raise ValueError(f"Could not fetch latest release from upstream repository: {exc}") from exc
raise ValueError("No published release found in upstream repository")
async def fetch_latest_from_upstream() -> tuple[str | None, str | None, str | None]:
try:
latest, ref = await asyncio.to_thread(_fetch_latest_from_upstream_sync)
return latest, ref, None
except URLError as exc:
return None, None, f"Version check failed: {exc.reason}"
except Exception as exc: # noqa: BLE001
return None, None, f"Version check failed: {exc}"
def utcnow() -> datetime:
return datetime.now(timezone.utc)